Nacos源碼 (6) Grpc概述與Nacos集成

来源:https://www.cnblogs.com/xugf/archive/2023/09/18/17711539.html
-Advertisement-
Play Games

Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。 grpc-java GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。 grp ...


Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。

grpc-java

GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。

grpc-java是grpc對java語言的實現,使用Netty/Okhttp作為通信組件。

使用方式

添加依賴

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.56.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
  <groupId>org.apache.tomcat</groupId>
  <artifactId>annotations-api</artifactId>
  <version>6.0.53</version>
  <scope>provided</scope>
</dependency>

生成代碼

需要將.proto文件放到src/main/proto或src/test/proto目錄下。

然後添加生成代碼使用的插件:

For protobuf-based codegen integrated with the Maven build system, you can use protobuf-maven-plugin (Eclipse and NetBeans users should also look at os-maven-plugin's IDE documentation):

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.7.1</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.6.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}</pluginArtifact>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
        <skip>false</skip>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

生成代碼:

mvn clean compile

High-level組件

At a high level there are three distinct layers to the library: Stub, Channel, and Transport.

Stub

The Stub layer is what is exposed to most developers and provides type-safe bindings to whatever datamodel/IDL/interface you are adapting. gRPC comes with a plugin to the protocol-buffers compiler that generates Stub interfaces out of .proto files, but bindings to other datamodel/IDL are easy and encouraged.

Channel

The Channel layer is an abstraction over Transport handling that is suitable for interception/decoration and exposes more behavior to the application than the Stub layer. It is intended to be easy for application frameworks to use this layer to address cross-cutting concerns such as logging, monitoring, auth, etc.

Transport

The Transport layer does the heavy lifting of putting and taking bytes off the wire. The interfaces to it are abstract just enough to allow plugging in of different implementations. Note the transport layer API is considered internal to gRPC and has weaker API guarantees than the core API under package io.grpc.

gRPC comes with multiple Transport implementations:

  • The Netty-based HTTP/2 transport is the main transport implementation based on Netty. It is not officially supported on Android.
  • The OkHttp-based HTTP/2 transport is a lightweight transport based on Okio and forked low-level parts of OkHttp. It is mainly for use on Android.
  • The in-process transport is for when a server is in the same process as the client. It is used frequently for testing, while also being safe for production use.
  • The Binder transport is for Android cross-process communication on a single device.

四種通信模式

  • 簡單rpc - 一個請求一個響應

    rpc getRealNameByUsername (StudentRequest) returns (StudentResponse) {}
    
  • 服務端流式rpc - 服務端流式響應

    rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
    
  • 客戶端流式rpc - 客戶端流式請求

    rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
    
  • 雙向流rpc

    rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
    

高級應用

  1. 攔截器
  2. Stream Tracer - 流攔截器
  3. Retry Policy - 客戶端重試
  4. NameResolver - 服務發現
  5. 負載均衡
  6. grpc與微服務:與dubbo、gateway、jwt、nacos2.x、openfeign

基礎示例

本小節將使用簡單的示例說明grpc-java的使用方法。

.proto文件

.proto文件需要放在src/main/proto目錄下麵:

syntax = "proto3";

package org.net5ijy.grpc.auto;

option java_package = "org.net5ijy.grpc.auto";
option java_outer_classname = "StudentRpc";
option java_multiple_files = true;

service StudentService {
  rpc getRealNameByUsername (StudentUsernameRequest) returns (StudentResponse) {}
  rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
  rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
  rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
}

message StudentUsernameRequest {
  string username = 1;
}

message StudentResponse {
  string realName = 1;
}

message StudentResponseList {
  repeated StudentResponse studentResponse = 1;
}

pom依賴

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.56.0</version>
</dependency>

pom插件

<build>
    <finalName>${project.artifactId}</finalName>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.7.1</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <!-- mvn protobuf:compile -->
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>
                    com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>
                    io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}
                </pluginArtifact>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                <clearOutputDirectory>false</clearOutputDirectory>
                <skip>false</skip>
            </configuration>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

編譯生成代碼

mvn clean compile

編寫業務實現類

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

  @Override
  public void getRealNameByUsername(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    StudentResponse response = StudentResponse.newBuilder().setRealName("徐國峰").build();

    responseObserver.onNext(response);
    responseObserver.onCompleted();
  }

  @Override
  public void getRealNameByUsernameLike(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰1").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰2").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰3").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰4").build());
    responseObserver.onCompleted();
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNameByUsernames(
      StreamObserver<StudentResponseList> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        StudentResponse response1 = StudentResponse.newBuilder().setRealName("徐國峰5").build();
        StudentResponse response2 = StudentResponse.newBuilder().setRealName("徐國峰6").build();
        StudentResponse response3 = StudentResponse.newBuilder().setRealName("徐國峰7").build();
        StudentResponse response4 = StudentResponse.newBuilder().setRealName("徐國峰8").build();

        StudentResponseList responseList = StudentResponseList.newBuilder()
            .addStudentResponse(response1)
            .addStudentResponse(response2)
            .addStudentResponse(response3)
            .addStudentResponse(response4)
            .build();

        responseObserver.onNext(responseList);
        responseObserver.onCompleted();
      }
    };
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNamesByUsernames(
      StreamObserver<StudentResponse> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
        StudentResponse response = StudentResponse.newBuilder()
            .setRealName("徐國峰" + new Random().nextInt(10)).build();
        responseObserver.onNext(response);
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        responseObserver.onCompleted();
      }
    };
  }
}

Server代碼

public class StudentGrpcServer {

  private static final AtomicInteger COUNT = new AtomicInteger(0);

  static final int GRPC_SERVER_PORT = 50051;

  private Server server;

  private void start() throws IOException {

    // grpc server executor
    Executor executor = new ThreadPoolExecutor(8, 16, 120, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        r -> {
          Thread t = new Thread(r);
          t.setName("stu-grpc-server-" + COUNT.incrementAndGet());
          return t;
        });

    /* The port on which the server should run */
    this.server = ServerBuilder.forPort(GRPC_SERVER_PORT).executor(executor)
        .compressorRegistry(CompressorRegistry.getDefaultInstance())
        .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
        .addService(new StudentServiceImpl())
        .intercept(serverInterceptor())
        .addTransportFilter(serverTransportFilter())
        .build();

    this.server.start();

    System.out.println("Server started, listening on " + GRPC_SERVER_PORT);

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      // Use stderr here since the logger may have been reset by its JVM shutdown hook.
      System.err.println("*** shutting down gRPC server since JVM is shutting down");
      try {
        StudentGrpcServer.this.stop();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.err.println("*** server shut down");
    }));
  }

  private void stop() throws InterruptedException {
    if (this.server != null && !this.server.isShutdown()) {
      this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutdown() throws InterruptedException {
    if (this.server != null) {
      this.server.awaitTermination();
    }
  }

  private ServerInterceptor serverInterceptor() {
    return new ServerInterceptor() {
      @Override
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
          Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        Context ctx = Context.current();
        return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
      }
    };
  }

  private ServerTransportFilter serverTransportFilter() {
    return new ServerTransportFilter() {
      @Override
      public Attributes transportReady(Attributes transportAttrs) {
        return super.transportReady(transportAttrs);
      }

      @Override
      public void transportTerminated(Attributes transportAttrs) {
        super.transportTerminated(transportAttrs);
      }
    };
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    final StudentGrpcServer server = new StudentGrpcServer();
    server.start();
    server.blockUntilShutdown();
  }
}

Client代碼

public class StudentGrpcClient {

  private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
  private final StudentServiceGrpc.StudentServiceStub stub;

  public StudentGrpcClient(Channel channel) {
    // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
    // shut it down.

    // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
    this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
    this.stub = StudentServiceGrpc.newStub(channel);
  }

  public void getRealNameByUsername(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      StudentResponse response = this.blockingStub.getRealNameByUsername(request);
      System.out.printf("Real name=%s\n", response.getRealName());
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernameLike(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      Iterator<StudentResponse> iterator = this.blockingStub.getRealNameByUsernameLike(request);
      iterator.forEachRemaining(r -> System.out.printf("Real name=%s\n", r.getRealName()));
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNameByUsernames(new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList responseList) {
              responseList.getStudentResponseList()
                  .forEach(r -> System.out.printf("Real name=%s\n", r.getRealName()));
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public void getRealNamesByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNamesByUsernames(new StreamObserver<StudentResponse>() {
            @Override
            public void onNext(StudentResponse response) {
              System.out.printf("Real name=%s\n", response.getRealName());
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) throws InterruptedException {

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();

    try {

      StudentGrpcClient client = new StudentGrpcClient(channel);

      int count = 1;

      for (int i = 0; i < count; i++) {
        client.getRealNameByUsername("admin2018");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernameLike("admin2019");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernames("admin2020");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNamesByUsernames("admin2021");
      }

      Thread.sleep(10000);

    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }
}

Client代碼(FutureStub)

僅適用於單請求單響應的簡單rpc調用:

try {

  ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
      StudentUsernameRequest.newBuilder().setUsername(username).build());

  // 阻塞等待
  // StudentResponse studentResponse = future.get();

  CountDownLatch latch = new CountDownLatch(1);

  Futures.addCallback(future, new FutureCallback<StudentResponse>() {
    @Override
    public void onSuccess(StudentResponse response) {
      System.out.printf("Real name=%s\n", response.getRealName());
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      System.err.println(t.getMessage());
      latch.countDown();
    }
  }, Executors.newSingleThreadExecutor());

  latch.await();

} catch (StatusRuntimeException | InterruptedException e) {
  System.err.println(e.getMessage());
}

Nacos中grpc的使用

在Nacos中,proto文件並沒有定義所有的介面,而是只定義了基礎的轉發介面和通用請求響應Payload結構體。

具體的介面請求響應結構體在業務代碼中編寫,業務介面則是使用轉發介面進行路由,類似SpringMVC中DispatcherServlet轉發請求給Controller一樣。

本小節將簡單介紹Nacos中集成grpc的方式。

服務端

BaseGrpcServer

抽象類BaseRpcServer定義了rpc伺服器的框架邏輯,模板方法startServer()要子類實現,是啟動rpc伺服器的核心邏輯。

抽象類BaseGrpcServer繼承了BaseRpcServer類,封裝了grpc組件:

  • Server - GRPC伺服器對象
  • GrpcRequestAcceptor - 業務請求接收、轉發器
  • GrpcBiStreamRequestAcceptor - 連接請求接收處理器,用於獲取雙向流發送StreamObserver
  • ConnectionManager - 連接管理器

startServer()方法封裝了啟動grpc伺服器的邏輯:

public void startServer() throws Exception {
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();

    // server interceptor to set connection id.
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                ServerCallHandler<T, S> next) {
            // 把connectionId、ip、port等保存到Context上
            Context ctx = Context.current()
                    .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_PORT,
                               call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                    .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                Channel internalChannel = getInternalChannel(call);
                // 保存channel
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };
    // 添加轉發組件
    addServices(handlerRegistry, serverInterceptor);

    // 創建Server
    server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
            .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {
                    // 在連接建立時獲取ip、port等信息,生成connectionId
                    InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                    InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                    int remotePort = remoteAddress.getPort();
                    int localPort = localAddress.getPort();
                    String remoteIp = remoteAddress.getAddress().getHostAddress();
                    Attributes attrWrapper = transportAttrs.toBuilder()
                            .set(TRANS_KEY_CONN_ID,
                                 System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                            .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                            .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                    return attrWrapper;
                    
                }

                @Override
                public void transportTerminated(Attributes transportAttrs) {
                    String connectionId = null;
                    try {
                        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                    } catch (Exception e) {
                        // Ignore
                    }
                    if (StringUtils.isNotBlank(connectionId)) {
                        // 連接斷開時,從connectionManager移除
                        connectionManager.unregister(connectionId);
                    }
                }
            }).build();

    server.start();
}

GrpcBiStreamRequestAcceptor

grpc bi stream request handler.

主要功能就是封裝服務端Connection對象:

if (parseObj instanceof ConnectionSetupRequest) {
    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
    Map<String, String> labels = setUpRequest.getLabels();
    String appName = "-";
    if (labels != null && labels.containsKey(Constants.APPNAME)) {
        appName = labels.get(Constants.APPNAME);
    }

    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
    metaInfo.setTenant(setUpRequest.getTenant());
    // 封裝連接基礎信息和responseObserver、channel
    // 使用responseObserver向客戶端推送消息
    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
    connection.setAbilities(setUpRequest.getAbilities());
    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
    // 註冊到connectionManager
    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
        try {
            connection.request(new ConnectResetRequest(), 3000L);
            connection.close();
        } catch (Exception e) {
            
        }
    }
}

GrpcRequestAcceptor

處理業務請求,將業務請求轉發到RequestHandler上:

public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {

    String type = grpcRequest.getMetadata().getType();

    // 查找RequestHandler處理器對象
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // no handler found
    if (requestHandler == null) {
        Payload payloadResponse = GrpcUtils
                .convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 請求體反序列化
    Object parseObj = null;
    try {
        parseObj = GrpcUtils.parse(grpcRequest);
    } catch (Exception e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 處理業務請求
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新客戶端活躍狀態
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        // 使用RequestHandler處理請求
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        // 響應
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse((e instanceof NacosException) ?
                               ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                               e.getMessage()));
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    }
    
}

客戶端

客戶端使用GrpcSdkClient類,連接服務端的邏輯在其父類GrpcClient的connectToServer方法中:

public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
            int threadNumber = ThreadUtils.getSuitableThreadCount(8);
            grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(10000),
                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
                        "nacos-grpc-client-executor-%d").build());
            grpcExecutor.allowCoreThreadTimeOut(true);
        }
        int port = serverInfo.getServerPort() + rpcPortOffset();
        // 創建grpc客戶端stub
        RequestGrpc.RequestFutureStub newChannelStubTemp =
            createNewChannelStub(serverInfo.getServerIp(), port);
        if (newChannelStubTemp != null) {
            // 檢查服務端的可用性
            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null || !(response instanceof ServerCheckResponse)) {
                shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                return null;
            }
            // 創建biStreamStub
            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
                    .newStub(newChannelStubTemp.getChannel());
            // 創建connection
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

            // create stream request and bind connection event to this connection.
            // 用於向服務端發送請求
            StreamObserver<Payload> payloadStreamObserver =
                bindRequestStream(biRequestStreamStub, grpcConn);

            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
            // 發送一個ConnectionSetupRequest讓服務端創建Connection
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            conSetupRequest.setLabels(super.getLabels());
            conSetupRequest.setAbilities(super.clientAbilities);
            conSetupRequest.setTenant(super.getTenant());
            grpcConn.sendRequest(conSetupRequest);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {}
    return null;
}

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {
    return streamStub.requestBiStream(new StreamObserver<Payload>() {
        @Override
        public void onNext(Payload payload) {
            try {
                Object parseBody = GrpcUtils.parse(payload);
                final Request request = (Request) parseBody;
                if (request != null) {
                    try {
                        // 使用客戶端側的ServerRequestHandler處理服務端發送過來的數據
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            response.setRequestId(request.getRequestId());
                            // 響應
                            sendResponse(response);
                        }
                    } catch (Exception e) {
                        sendResponse(request.getRequestId(), false);
                    }
                }
            } catch (Exception e) {
                // ...
            }
        }

        @Override
        public void onError(Throwable throwable) {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            } else {
                // ...
            }
        }

        @Override
        public void onCompleted() {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            }
        }
    });
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 多線程是每個程式員的噩夢,用得好可以提升效率很爽,用得不好就是埋汰的火葬場。 這裡不深入介紹,主要是講解一些標準用法,熟讀唐詩三百首,不會作詩也會吟。 這裡就介紹一下springboot中的多線程的使用,使用線程連接池去非同步執行業務方法。 由於代碼中包含詳細註釋,也為了保持文章的整潔性,我就不 ...
  • 目錄前言必備理論知識:例子: 前言 有C#經驗,使用起來,駕輕就熟。 就是語法糖。但是也要熟悉用法,才好眾享絲滑。 內容參考: Chatjpt、文心一言 必備理論知識: 捕獲列表: []:預設不捕獲任何變數; [=]:預設以值捕獲所有變數;內部有一個相應的副本 [&]:預設以引用捕獲所有變數; [x ...
  • 變數 變數是用於存儲數據值的容器。 創建變數 Python沒有用於聲明變數的命令。 變數在您第一次為其分配值時被創建。 示例 x = 5 y = "John" print(x) print(y) 變數不需要聲明為特定類型,並且甚至在設置後可以更改類型。 示例 x = 4 # x的類型為int x = ...
  • 0 前言 一般初學者學習編碼和[錯誤處理]時,先知道[編程語言]有一種處理錯誤的形式或約定(如Java就拋異常),然後就開始用這些工具。但卻忽視這問題本質:處理錯誤是為了寫正確程式。可是 1 啥叫“正確”? 由解決的問題決定的。問題不同,解決方案不同。 如一個web介面接受用戶請求,參數age,也許 ...
  • 什麼是裝飾器,它們如何被使用,以及我們如何利用它們來構建代碼。我們將看到裝飾器是如何成為一個強大的工具,可以用來為我們的應用程式添加功能,並且可以在Python編程語言中找到。 裝飾器順序 在Python中,裝飾器是一個特殊的函數,可以修改另一個函數的行為。裝飾器是一種設計模式,它在不改變現有對象結 ...
  • dataclass 到 Python 中的 JSON JavaScript Object Notation或JSON表示使用編程語言中的文本組成的腳本(可執行)文件來存儲和傳輸數據。 Python通過JSON內置模塊支持JSON。因此,我們在Python腳本中導入JSON包,以利用這一能力。 JSO ...
  • 前言 很多時候,由於種種不可描述的原因,我們需要針對單個介面實現介面限流,防止訪問次數過於頻繁。這裡就用 redis+aop 實現一個限流介面註解 @RedisLimit 代碼 點擊查看RedisLimit註解代碼 import java.lang.annotation.*; /** * 功能:分佈 ...
  • 對重寫代碼說不。 以下為譯文: 1、重寫代碼消耗了12個月! 我們從頭開始重寫代碼浪費的時間。 你能想象在軟體行業,12個月的時間沒有任何新產品推出,沒有任何新版本更新嗎? 真的,我不由自主地問自己這個問題: 在這個快速發展的世界里,12月的時間能讓我們做多少事情? “2015年1月20日,星期二, ...
一周排行
    -Advertisement-
    Play Games
  • 前言 微服務架構已經成為搭建高效、可擴展系統的關鍵技術之一,然而,現有許多微服務框架往往過於複雜,使得我們普通開發者難以快速上手並體驗到微服務帶了的便利。為瞭解決這一問題,於是作者精心打造了一款最接地氣的 .NET 微服務框架,幫助我們輕鬆構建和管理微服務應用。 本框架不僅支持 Consul 服務註 ...
  • 先看一下效果吧: 如果不會寫動畫或者懶得寫動畫,就直接交給Blend來做吧; 其實Blend操作起來很簡單,有點類似於在操作PS,我們只需要設置關鍵幀,滑鼠點來點去就可以了,Blend會自動幫我們生成我們想要的動畫效果. 第一步:要創建一個空的WPF項目 第二步:右鍵我們的項目,在最下方有一個,在B ...
  • Prism:框架介紹與安裝 什麼是Prism? Prism是一個用於在 WPF、Xamarin Form、Uno 平臺和 WinUI 中構建鬆散耦合、可維護和可測試的 XAML 應用程式框架 Github https://github.com/PrismLibrary/Prism NuGet htt ...
  • 在WPF中,屏幕上的所有內容,都是通過畫筆(Brush)畫上去的。如按鈕的背景色,邊框,文本框的前景和形狀填充。藉助畫筆,可以繪製頁面上的所有UI對象。不同畫筆具有不同類型的輸出( 如:某些畫筆使用純色繪製區域,其他畫筆使用漸變、圖案、圖像或繪圖)。 ...
  • 前言 嗨,大家好!推薦一個基於 .NET 8 的高併發微服務電商系統,涵蓋了商品、訂單、會員、服務、財務等50多種實用功能。 項目不僅使用了 .NET 8 的最新特性,還集成了AutoFac、DotLiquid、HangFire、Nlog、Jwt、LayUIAdmin、SqlSugar、MySQL、 ...
  • 本文主要介紹攝像頭(相機)如何採集數據,用於類似攝像頭本地顯示軟體,以及流媒體數據傳輸場景如傳屏、視訊會議等。 攝像頭採集有多種方案,如AForge.NET、WPFMediaKit、OpenCvSharp、EmguCv、DirectShow.NET、MediaCaptre(UWP),網上一些文章以及 ...
  • 前言 Seal-Report 是一款.NET 開源報表工具,擁有 1.4K Star。它提供了一個完整的框架,使用 C# 編寫,最新的版本採用的是 .NET 8.0 。 它能夠高效地從各種資料庫或 NoSQL 數據源生成日常報表,並支持執行複雜的報表任務。 其簡單易用的安裝過程和直觀的設計界面,我們 ...
  • 背景需求: 系統需要對接到XXX官方的API,但因此官方對接以及管理都十分嚴格。而本人部門的系統中包含諸多子系統,系統間為了穩定,程式間多數固定Token+特殊驗證進行調用,且後期還要提供給其他兄弟部門系統共同調用。 原則上:每套系統都必須單獨接入到官方,但官方的接入複雜,還要官方指定機構認證的證書 ...
  • 本文介紹下電腦設備關機的情況下如何通過網路喚醒設備,之前電源S狀態 電腦Power電源狀態- 唐宋元明清2188 - 博客園 (cnblogs.com) 有介紹過遠程喚醒設備,後面這倆天瞭解多了點所以單獨加個隨筆 設備關機的情況下,使用網路喚醒的前提條件: 1. 被喚醒設備需要支持這WakeOnL ...
  • 前言 大家好,推薦一個.NET 8.0 為核心,結合前端 Vue 框架,實現了前後端完全分離的設計理念。它不僅提供了強大的基礎功能支持,如許可權管理、代碼生成器等,還通過採用主流技術和最佳實踐,顯著降低了開發難度,加快了項目交付速度。 如果你需要一個高效的開發解決方案,本框架能幫助大家輕鬆應對挑戰,實 ...