Nacos源碼 (6) Grpc概述與Nacos集成

来源:https://www.cnblogs.com/xugf/archive/2023/09/18/17711539.html
-Advertisement-
Play Games

Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。 grpc-java GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。 grp ...


Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。

grpc-java

GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。

grpc-java是grpc對java語言的實現,使用Netty/Okhttp作為通信組件。

使用方式

添加依賴

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.56.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
  <groupId>org.apache.tomcat</groupId>
  <artifactId>annotations-api</artifactId>
  <version>6.0.53</version>
  <scope>provided</scope>
</dependency>

生成代碼

需要將.proto文件放到src/main/proto或src/test/proto目錄下。

然後添加生成代碼使用的插件:

For protobuf-based codegen integrated with the Maven build system, you can use protobuf-maven-plugin (Eclipse and NetBeans users should also look at os-maven-plugin's IDE documentation):

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.7.1</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.6.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}</pluginArtifact>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
        <skip>false</skip>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

生成代碼:

mvn clean compile

High-level組件

At a high level there are three distinct layers to the library: Stub, Channel, and Transport.

Stub

The Stub layer is what is exposed to most developers and provides type-safe bindings to whatever datamodel/IDL/interface you are adapting. gRPC comes with a plugin to the protocol-buffers compiler that generates Stub interfaces out of .proto files, but bindings to other datamodel/IDL are easy and encouraged.

Channel

The Channel layer is an abstraction over Transport handling that is suitable for interception/decoration and exposes more behavior to the application than the Stub layer. It is intended to be easy for application frameworks to use this layer to address cross-cutting concerns such as logging, monitoring, auth, etc.

Transport

The Transport layer does the heavy lifting of putting and taking bytes off the wire. The interfaces to it are abstract just enough to allow plugging in of different implementations. Note the transport layer API is considered internal to gRPC and has weaker API guarantees than the core API under package io.grpc.

gRPC comes with multiple Transport implementations:

  • The Netty-based HTTP/2 transport is the main transport implementation based on Netty. It is not officially supported on Android.
  • The OkHttp-based HTTP/2 transport is a lightweight transport based on Okio and forked low-level parts of OkHttp. It is mainly for use on Android.
  • The in-process transport is for when a server is in the same process as the client. It is used frequently for testing, while also being safe for production use.
  • The Binder transport is for Android cross-process communication on a single device.

四種通信模式

  • 簡單rpc - 一個請求一個響應

    rpc getRealNameByUsername (StudentRequest) returns (StudentResponse) {}
    
  • 服務端流式rpc - 服務端流式響應

    rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
    
  • 客戶端流式rpc - 客戶端流式請求

    rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
    
  • 雙向流rpc

    rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
    

高級應用

  1. 攔截器
  2. Stream Tracer - 流攔截器
  3. Retry Policy - 客戶端重試
  4. NameResolver - 服務發現
  5. 負載均衡
  6. grpc與微服務:與dubbo、gateway、jwt、nacos2.x、openfeign

基礎示例

本小節將使用簡單的示例說明grpc-java的使用方法。

.proto文件

.proto文件需要放在src/main/proto目錄下麵:

syntax = "proto3";

package org.net5ijy.grpc.auto;

option java_package = "org.net5ijy.grpc.auto";
option java_outer_classname = "StudentRpc";
option java_multiple_files = true;

service StudentService {
  rpc getRealNameByUsername (StudentUsernameRequest) returns (StudentResponse) {}
  rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
  rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
  rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
}

message StudentUsernameRequest {
  string username = 1;
}

message StudentResponse {
  string realName = 1;
}

message StudentResponseList {
  repeated StudentResponse studentResponse = 1;
}

pom依賴

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.56.0</version>
</dependency>

pom插件

<build>
    <finalName>${project.artifactId}</finalName>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.7.1</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <!-- mvn protobuf:compile -->
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>
                    com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>
                    io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}
                </pluginArtifact>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                <clearOutputDirectory>false</clearOutputDirectory>
                <skip>false</skip>
            </configuration>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

編譯生成代碼

mvn clean compile

編寫業務實現類

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

  @Override
  public void getRealNameByUsername(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    StudentResponse response = StudentResponse.newBuilder().setRealName("徐國峰").build();

    responseObserver.onNext(response);
    responseObserver.onCompleted();
  }

  @Override
  public void getRealNameByUsernameLike(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰1").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰2").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰3").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰4").build());
    responseObserver.onCompleted();
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNameByUsernames(
      StreamObserver<StudentResponseList> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        StudentResponse response1 = StudentResponse.newBuilder().setRealName("徐國峰5").build();
        StudentResponse response2 = StudentResponse.newBuilder().setRealName("徐國峰6").build();
        StudentResponse response3 = StudentResponse.newBuilder().setRealName("徐國峰7").build();
        StudentResponse response4 = StudentResponse.newBuilder().setRealName("徐國峰8").build();

        StudentResponseList responseList = StudentResponseList.newBuilder()
            .addStudentResponse(response1)
            .addStudentResponse(response2)
            .addStudentResponse(response3)
            .addStudentResponse(response4)
            .build();

        responseObserver.onNext(responseList);
        responseObserver.onCompleted();
      }
    };
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNamesByUsernames(
      StreamObserver<StudentResponse> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
        StudentResponse response = StudentResponse.newBuilder()
            .setRealName("徐國峰" + new Random().nextInt(10)).build();
        responseObserver.onNext(response);
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        responseObserver.onCompleted();
      }
    };
  }
}

Server代碼

public class StudentGrpcServer {

  private static final AtomicInteger COUNT = new AtomicInteger(0);

  static final int GRPC_SERVER_PORT = 50051;

  private Server server;

  private void start() throws IOException {

    // grpc server executor
    Executor executor = new ThreadPoolExecutor(8, 16, 120, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        r -> {
          Thread t = new Thread(r);
          t.setName("stu-grpc-server-" + COUNT.incrementAndGet());
          return t;
        });

    /* The port on which the server should run */
    this.server = ServerBuilder.forPort(GRPC_SERVER_PORT).executor(executor)
        .compressorRegistry(CompressorRegistry.getDefaultInstance())
        .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
        .addService(new StudentServiceImpl())
        .intercept(serverInterceptor())
        .addTransportFilter(serverTransportFilter())
        .build();

    this.server.start();

    System.out.println("Server started, listening on " + GRPC_SERVER_PORT);

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      // Use stderr here since the logger may have been reset by its JVM shutdown hook.
      System.err.println("*** shutting down gRPC server since JVM is shutting down");
      try {
        StudentGrpcServer.this.stop();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.err.println("*** server shut down");
    }));
  }

  private void stop() throws InterruptedException {
    if (this.server != null && !this.server.isShutdown()) {
      this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutdown() throws InterruptedException {
    if (this.server != null) {
      this.server.awaitTermination();
    }
  }

  private ServerInterceptor serverInterceptor() {
    return new ServerInterceptor() {
      @Override
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
          Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        Context ctx = Context.current();
        return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
      }
    };
  }

  private ServerTransportFilter serverTransportFilter() {
    return new ServerTransportFilter() {
      @Override
      public Attributes transportReady(Attributes transportAttrs) {
        return super.transportReady(transportAttrs);
      }

      @Override
      public void transportTerminated(Attributes transportAttrs) {
        super.transportTerminated(transportAttrs);
      }
    };
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    final StudentGrpcServer server = new StudentGrpcServer();
    server.start();
    server.blockUntilShutdown();
  }
}

Client代碼

public class StudentGrpcClient {

  private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
  private final StudentServiceGrpc.StudentServiceStub stub;

  public StudentGrpcClient(Channel channel) {
    // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
    // shut it down.

    // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
    this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
    this.stub = StudentServiceGrpc.newStub(channel);
  }

  public void getRealNameByUsername(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      StudentResponse response = this.blockingStub.getRealNameByUsername(request);
      System.out.printf("Real name=%s\n", response.getRealName());
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernameLike(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      Iterator<StudentResponse> iterator = this.blockingStub.getRealNameByUsernameLike(request);
      iterator.forEachRemaining(r -> System.out.printf("Real name=%s\n", r.getRealName()));
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNameByUsernames(new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList responseList) {
              responseList.getStudentResponseList()
                  .forEach(r -> System.out.printf("Real name=%s\n", r.getRealName()));
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public void getRealNamesByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNamesByUsernames(new StreamObserver<StudentResponse>() {
            @Override
            public void onNext(StudentResponse response) {
              System.out.printf("Real name=%s\n", response.getRealName());
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) throws InterruptedException {

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();

    try {

      StudentGrpcClient client = new StudentGrpcClient(channel);

      int count = 1;

      for (int i = 0; i < count; i++) {
        client.getRealNameByUsername("admin2018");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernameLike("admin2019");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernames("admin2020");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNamesByUsernames("admin2021");
      }

      Thread.sleep(10000);

    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }
}

Client代碼(FutureStub)

僅適用於單請求單響應的簡單rpc調用:

try {

  ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
      StudentUsernameRequest.newBuilder().setUsername(username).build());

  // 阻塞等待
  // StudentResponse studentResponse = future.get();

  CountDownLatch latch = new CountDownLatch(1);

  Futures.addCallback(future, new FutureCallback<StudentResponse>() {
    @Override
    public void onSuccess(StudentResponse response) {
      System.out.printf("Real name=%s\n", response.getRealName());
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      System.err.println(t.getMessage());
      latch.countDown();
    }
  }, Executors.newSingleThreadExecutor());

  latch.await();

} catch (StatusRuntimeException | InterruptedException e) {
  System.err.println(e.getMessage());
}

Nacos中grpc的使用

在Nacos中,proto文件並沒有定義所有的介面,而是只定義了基礎的轉發介面和通用請求響應Payload結構體。

具體的介面請求響應結構體在業務代碼中編寫,業務介面則是使用轉發介面進行路由,類似SpringMVC中DispatcherServlet轉發請求給Controller一樣。

本小節將簡單介紹Nacos中集成grpc的方式。

服務端

BaseGrpcServer

抽象類BaseRpcServer定義了rpc伺服器的框架邏輯,模板方法startServer()要子類實現,是啟動rpc伺服器的核心邏輯。

抽象類BaseGrpcServer繼承了BaseRpcServer類,封裝了grpc組件:

  • Server - GRPC伺服器對象
  • GrpcRequestAcceptor - 業務請求接收、轉發器
  • GrpcBiStreamRequestAcceptor - 連接請求接收處理器,用於獲取雙向流發送StreamObserver
  • ConnectionManager - 連接管理器

startServer()方法封裝了啟動grpc伺服器的邏輯:

public void startServer() throws Exception {
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();

    // server interceptor to set connection id.
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                ServerCallHandler<T, S> next) {
            // 把connectionId、ip、port等保存到Context上
            Context ctx = Context.current()
                    .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_PORT,
                               call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                    .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                Channel internalChannel = getInternalChannel(call);
                // 保存channel
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };
    // 添加轉發組件
    addServices(handlerRegistry, serverInterceptor);

    // 創建Server
    server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
            .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {
                    // 在連接建立時獲取ip、port等信息,生成connectionId
                    InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                    InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                    int remotePort = remoteAddress.getPort();
                    int localPort = localAddress.getPort();
                    String remoteIp = remoteAddress.getAddress().getHostAddress();
                    Attributes attrWrapper = transportAttrs.toBuilder()
                            .set(TRANS_KEY_CONN_ID,
                                 System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                            .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                            .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                    return attrWrapper;
                    
                }

                @Override
                public void transportTerminated(Attributes transportAttrs) {
                    String connectionId = null;
                    try {
                        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                    } catch (Exception e) {
                        // Ignore
                    }
                    if (StringUtils.isNotBlank(connectionId)) {
                        // 連接斷開時,從connectionManager移除
                        connectionManager.unregister(connectionId);
                    }
                }
            }).build();

    server.start();
}

GrpcBiStreamRequestAcceptor

grpc bi stream request handler.

主要功能就是封裝服務端Connection對象:

if (parseObj instanceof ConnectionSetupRequest) {
    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
    Map<String, String> labels = setUpRequest.getLabels();
    String appName = "-";
    if (labels != null && labels.containsKey(Constants.APPNAME)) {
        appName = labels.get(Constants.APPNAME);
    }

    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
    metaInfo.setTenant(setUpRequest.getTenant());
    // 封裝連接基礎信息和responseObserver、channel
    // 使用responseObserver向客戶端推送消息
    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
    connection.setAbilities(setUpRequest.getAbilities());
    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
    // 註冊到connectionManager
    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
        try {
            connection.request(new ConnectResetRequest(), 3000L);
            connection.close();
        } catch (Exception e) {
            
        }
    }
}

GrpcRequestAcceptor

處理業務請求,將業務請求轉發到RequestHandler上:

public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {

    String type = grpcRequest.getMetadata().getType();

    // 查找RequestHandler處理器對象
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // no handler found
    if (requestHandler == null) {
        Payload payloadResponse = GrpcUtils
                .convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 請求體反序列化
    Object parseObj = null;
    try {
        parseObj = GrpcUtils.parse(grpcRequest);
    } catch (Exception e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 處理業務請求
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新客戶端活躍狀態
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        // 使用RequestHandler處理請求
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        // 響應
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse((e instanceof NacosException) ?
                               ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                               e.getMessage()));
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    }
    
}

客戶端

客戶端使用GrpcSdkClient類,連接服務端的邏輯在其父類GrpcClient的connectToServer方法中:

public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
            int threadNumber = ThreadUtils.getSuitableThreadCount(8);
            grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(10000),
                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
                        "nacos-grpc-client-executor-%d").build());
            grpcExecutor.allowCoreThreadTimeOut(true);
        }
        int port = serverInfo.getServerPort() + rpcPortOffset();
        // 創建grpc客戶端stub
        RequestGrpc.RequestFutureStub newChannelStubTemp =
            createNewChannelStub(serverInfo.getServerIp(), port);
        if (newChannelStubTemp != null) {
            // 檢查服務端的可用性
            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null || !(response instanceof ServerCheckResponse)) {
                shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                return null;
            }
            // 創建biStreamStub
            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
                    .newStub(newChannelStubTemp.getChannel());
            // 創建connection
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

            // create stream request and bind connection event to this connection.
            // 用於向服務端發送請求
            StreamObserver<Payload> payloadStreamObserver =
                bindRequestStream(biRequestStreamStub, grpcConn);

            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
            // 發送一個ConnectionSetupRequest讓服務端創建Connection
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            conSetupRequest.setLabels(super.getLabels());
            conSetupRequest.setAbilities(super.clientAbilities);
            conSetupRequest.setTenant(super.getTenant());
            grpcConn.sendRequest(conSetupRequest);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {}
    return null;
}

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {
    return streamStub.requestBiStream(new StreamObserver<Payload>() {
        @Override
        public void onNext(Payload payload) {
            try {
                Object parseBody = GrpcUtils.parse(payload);
                final Request request = (Request) parseBody;
                if (request != null) {
                    try {
                        // 使用客戶端側的ServerRequestHandler處理服務端發送過來的數據
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            response.setRequestId(request.getRequestId());
                            // 響應
                            sendResponse(response);
                        }
                    } catch (Exception e) {
                        sendResponse(request.getRequestId(), false);
                    }
                }
            } catch (Exception e) {
                // ...
            }
        }

        @Override
        public void onError(Throwable throwable) {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            } else {
                // ...
            }
        }

        @Override
        public void onCompleted() {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            }
        }
    });
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 多線程是每個程式員的噩夢,用得好可以提升效率很爽,用得不好就是埋汰的火葬場。 這裡不深入介紹,主要是講解一些標準用法,熟讀唐詩三百首,不會作詩也會吟。 這裡就介紹一下springboot中的多線程的使用,使用線程連接池去非同步執行業務方法。 由於代碼中包含詳細註釋,也為了保持文章的整潔性,我就不 ...
  • 目錄前言必備理論知識:例子: 前言 有C#經驗,使用起來,駕輕就熟。 就是語法糖。但是也要熟悉用法,才好眾享絲滑。 內容參考: Chatjpt、文心一言 必備理論知識: 捕獲列表: []:預設不捕獲任何變數; [=]:預設以值捕獲所有變數;內部有一個相應的副本 [&]:預設以引用捕獲所有變數; [x ...
  • 變數 變數是用於存儲數據值的容器。 創建變數 Python沒有用於聲明變數的命令。 變數在您第一次為其分配值時被創建。 示例 x = 5 y = "John" print(x) print(y) 變數不需要聲明為特定類型,並且甚至在設置後可以更改類型。 示例 x = 4 # x的類型為int x = ...
  • 0 前言 一般初學者學習編碼和[錯誤處理]時,先知道[編程語言]有一種處理錯誤的形式或約定(如Java就拋異常),然後就開始用這些工具。但卻忽視這問題本質:處理錯誤是為了寫正確程式。可是 1 啥叫“正確”? 由解決的問題決定的。問題不同,解決方案不同。 如一個web介面接受用戶請求,參數age,也許 ...
  • 什麼是裝飾器,它們如何被使用,以及我們如何利用它們來構建代碼。我們將看到裝飾器是如何成為一個強大的工具,可以用來為我們的應用程式添加功能,並且可以在Python編程語言中找到。 裝飾器順序 在Python中,裝飾器是一個特殊的函數,可以修改另一個函數的行為。裝飾器是一種設計模式,它在不改變現有對象結 ...
  • dataclass 到 Python 中的 JSON JavaScript Object Notation或JSON表示使用編程語言中的文本組成的腳本(可執行)文件來存儲和傳輸數據。 Python通過JSON內置模塊支持JSON。因此,我們在Python腳本中導入JSON包,以利用這一能力。 JSO ...
  • 前言 很多時候,由於種種不可描述的原因,我們需要針對單個介面實現介面限流,防止訪問次數過於頻繁。這裡就用 redis+aop 實現一個限流介面註解 @RedisLimit 代碼 點擊查看RedisLimit註解代碼 import java.lang.annotation.*; /** * 功能:分佈 ...
  • 對重寫代碼說不。 以下為譯文: 1、重寫代碼消耗了12個月! 我們從頭開始重寫代碼浪費的時間。 你能想象在軟體行業,12個月的時間沒有任何新產品推出,沒有任何新版本更新嗎? 真的,我不由自主地問自己這個問題: 在這個快速發展的世界里,12月的時間能讓我們做多少事情? “2015年1月20日,星期二, ...
一周排行
    -Advertisement-
    Play Games
  • 前言 插件化的需求主要源於對軟體架構靈活性的追求,特別是在開發大型、複雜或需要不斷更新的軟體系統時,插件化可以提高軟體系統的可擴展性、可定製性、隔離性、安全性、可維護性、模塊化、易於升級和更新以及支持第三方開發等方面的能力,從而滿足不斷變化的業務需求和技術挑戰。 一、插件化探索 在WPF中我們想要開 ...
  • 歡迎ReaLTaiizor是一個用戶友好的、以設計為中心的.NET WinForms項目控制項庫,包含廣泛的組件。您可以使用不同的主題選項對項目進行個性化設置,並自定義用戶控制項,以使您的應用程式更加專業。 項目地址:https://github.com/Taiizor/ReaLTaiizor 步驟1: ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • Channel 是乾什麼的 The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consume ...
  • efcore如何優雅的實現按年分庫按月分表 介紹 本文ShardinfCore版本 本期主角: ShardingCore 一款ef-core下高性能、輕量級針對分表分庫讀寫分離的解決方案,具有零依賴、零學習成本、零業務代碼入侵適配 距離上次發文.net相關的已經有很久了,期間一直在從事java相關的 ...
  • 前言 Spacesniffer 是一個免費的文件掃描工具,通過使用樹狀圖可視化佈局,可以立即瞭解大文件夾的位置,幫助用戶處理找到這些文件夾 當前系統C盤空間 清理後系統C盤空間 下載 Spacesniffer 下載地址:https://spacesniffer.en.softonic.com/dow ...
  • EDP是一套集組織架構,許可權框架【功能許可權,操作許可權,數據訪問許可權,WebApi許可權】,自動化日誌,動態Interface,WebApi管理等基礎功能於一體的,基於.net的企業應用開發框架。通過友好的編碼方式實現數據行、列許可權的管控。 ...
  • 一、ReZero簡介 ReZero是一款.NET中間件 : 全網唯一開源界面操作就能生成API , 可以集成到任何.NET6+ API項目,無破壞性,也可讓非.NET用戶使用exe文件 免費開源:MIT最寬鬆協議 , 一直從事開源事業十年,一直堅持開源 1.1 純ReZero開發 適合.Net Co ...
  • 一:背景 1. 講故事 停了一個月沒有更新文章了,主要是忙於寫 C#內功修煉系列的PPT,現在基本上接近尾聲,可以回頭繼續更新這段時間分析dump的一些事故報告,有朋友微信上找到我,說他們的系統出現了大量的http超時,程式不響應處理了,讓我幫忙看下怎麼回事,dump也抓到了。 二:WinDbg分析 ...
  • 開始做項目管理了(本人3年java,來到這邊之後真沒想到...),天天開會溝通整理需求,他們講話的時候忙裡偷閑整理一下常用的方法,其實語言還是有共通性的,基本上看到方法名就大概能猜出來用法。出去打水的時候看到外面太陽好好,真想在外面坐著曬太陽,回來的時候好兄弟三年前送給我的鍵盤D鍵不靈了,在打"等待 ...