Nacos源碼 (6) Grpc概述與Nacos集成

来源:https://www.cnblogs.com/xugf/archive/2023/09/18/17711539.html
-Advertisement-
Play Games

Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。 grpc-java GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。 grp ...


Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。

grpc-java

GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。

grpc-java是grpc對java語言的實現,使用Netty/Okhttp作為通信組件。

使用方式

添加依賴

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.56.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.56.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
  <groupId>org.apache.tomcat</groupId>
  <artifactId>annotations-api</artifactId>
  <version>6.0.53</version>
  <scope>provided</scope>
</dependency>

生成代碼

需要將.proto文件放到src/main/proto或src/test/proto目錄下。

然後添加生成代碼使用的插件:

For protobuf-based codegen integrated with the Maven build system, you can use protobuf-maven-plugin (Eclipse and NetBeans users should also look at os-maven-plugin's IDE documentation):

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.7.1</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.6.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}</pluginArtifact>
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <clearOutputDirectory>false</clearOutputDirectory>
        <skip>false</skip>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

生成代碼:

mvn clean compile

High-level組件

At a high level there are three distinct layers to the library: Stub, Channel, and Transport.

Stub

The Stub layer is what is exposed to most developers and provides type-safe bindings to whatever datamodel/IDL/interface you are adapting. gRPC comes with a plugin to the protocol-buffers compiler that generates Stub interfaces out of .proto files, but bindings to other datamodel/IDL are easy and encouraged.

Channel

The Channel layer is an abstraction over Transport handling that is suitable for interception/decoration and exposes more behavior to the application than the Stub layer. It is intended to be easy for application frameworks to use this layer to address cross-cutting concerns such as logging, monitoring, auth, etc.

Transport

The Transport layer does the heavy lifting of putting and taking bytes off the wire. The interfaces to it are abstract just enough to allow plugging in of different implementations. Note the transport layer API is considered internal to gRPC and has weaker API guarantees than the core API under package io.grpc.

gRPC comes with multiple Transport implementations:

  • The Netty-based HTTP/2 transport is the main transport implementation based on Netty. It is not officially supported on Android.
  • The OkHttp-based HTTP/2 transport is a lightweight transport based on Okio and forked low-level parts of OkHttp. It is mainly for use on Android.
  • The in-process transport is for when a server is in the same process as the client. It is used frequently for testing, while also being safe for production use.
  • The Binder transport is for Android cross-process communication on a single device.

四種通信模式

  • 簡單rpc - 一個請求一個響應

    rpc getRealNameByUsername (StudentRequest) returns (StudentResponse) {}
    
  • 服務端流式rpc - 服務端流式響應

    rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
    
  • 客戶端流式rpc - 客戶端流式請求

    rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
    
  • 雙向流rpc

    rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
    

高級應用

  1. 攔截器
  2. Stream Tracer - 流攔截器
  3. Retry Policy - 客戶端重試
  4. NameResolver - 服務發現
  5. 負載均衡
  6. grpc與微服務:與dubbo、gateway、jwt、nacos2.x、openfeign

基礎示例

本小節將使用簡單的示例說明grpc-java的使用方法。

.proto文件

.proto文件需要放在src/main/proto目錄下麵:

syntax = "proto3";

package org.net5ijy.grpc.auto;

option java_package = "org.net5ijy.grpc.auto";
option java_outer_classname = "StudentRpc";
option java_multiple_files = true;

service StudentService {
  rpc getRealNameByUsername (StudentUsernameRequest) returns (StudentResponse) {}
  rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
  rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
  rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
}

message StudentUsernameRequest {
  string username = 1;
}

message StudentResponse {
  string realName = 1;
}

message StudentResponseList {
  repeated StudentResponse studentResponse = 1;
}

pom依賴

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.56.0</version>
</dependency>
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.56.0</version>
</dependency>

pom插件

<build>
    <finalName>${project.artifactId}</finalName>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.7.1</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <!-- mvn protobuf:compile -->
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>
                    com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}
                </protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>
                    io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}
                </pluginArtifact>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                <clearOutputDirectory>false</clearOutputDirectory>
                <skip>false</skip>
            </configuration>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

編譯生成代碼

mvn clean compile

編寫業務實現類

public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

  @Override
  public void getRealNameByUsername(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    StudentResponse response = StudentResponse.newBuilder().setRealName("徐國峰").build();

    responseObserver.onNext(response);
    responseObserver.onCompleted();
  }

  @Override
  public void getRealNameByUsernameLike(StudentUsernameRequest request,
      StreamObserver<StudentResponse> responseObserver) {

    String username = request.getUsername();
    System.out.printf("username=%s\n", username);

    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰1").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰2").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰3").build());
    responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰4").build());
    responseObserver.onCompleted();
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNameByUsernames(
      StreamObserver<StudentResponseList> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        StudentResponse response1 = StudentResponse.newBuilder().setRealName("徐國峰5").build();
        StudentResponse response2 = StudentResponse.newBuilder().setRealName("徐國峰6").build();
        StudentResponse response3 = StudentResponse.newBuilder().setRealName("徐國峰7").build();
        StudentResponse response4 = StudentResponse.newBuilder().setRealName("徐國峰8").build();

        StudentResponseList responseList = StudentResponseList.newBuilder()
            .addStudentResponse(response1)
            .addStudentResponse(response2)
            .addStudentResponse(response3)
            .addStudentResponse(response4)
            .build();

        responseObserver.onNext(responseList);
        responseObserver.onCompleted();
      }
    };
  }

  @Override
  public StreamObserver<StudentUsernameRequest> getRealNamesByUsernames(
      StreamObserver<StudentResponse> responseObserver) {
    return new StreamObserver<StudentUsernameRequest>() {
      @Override
      public void onNext(StudentUsernameRequest request) {
        System.out.printf("username=%s\n", request.getUsername());
        StudentResponse response = StudentResponse.newBuilder()
            .setRealName("徐國峰" + new Random().nextInt(10)).build();
        responseObserver.onNext(response);
      }

      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }

      @Override
      public void onCompleted() {
        responseObserver.onCompleted();
      }
    };
  }
}

Server代碼

public class StudentGrpcServer {

  private static final AtomicInteger COUNT = new AtomicInteger(0);

  static final int GRPC_SERVER_PORT = 50051;

  private Server server;

  private void start() throws IOException {

    // grpc server executor
    Executor executor = new ThreadPoolExecutor(8, 16, 120, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        r -> {
          Thread t = new Thread(r);
          t.setName("stu-grpc-server-" + COUNT.incrementAndGet());
          return t;
        });

    /* The port on which the server should run */
    this.server = ServerBuilder.forPort(GRPC_SERVER_PORT).executor(executor)
        .compressorRegistry(CompressorRegistry.getDefaultInstance())
        .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
        .addService(new StudentServiceImpl())
        .intercept(serverInterceptor())
        .addTransportFilter(serverTransportFilter())
        .build();

    this.server.start();

    System.out.println("Server started, listening on " + GRPC_SERVER_PORT);

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      // Use stderr here since the logger may have been reset by its JVM shutdown hook.
      System.err.println("*** shutting down gRPC server since JVM is shutting down");
      try {
        StudentGrpcServer.this.stop();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.err.println("*** server shut down");
    }));
  }

  private void stop() throws InterruptedException {
    if (this.server != null && !this.server.isShutdown()) {
      this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
  }

  private void blockUntilShutdown() throws InterruptedException {
    if (this.server != null) {
      this.server.awaitTermination();
    }
  }

  private ServerInterceptor serverInterceptor() {
    return new ServerInterceptor() {
      @Override
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
          Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        Context ctx = Context.current();
        return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
      }
    };
  }

  private ServerTransportFilter serverTransportFilter() {
    return new ServerTransportFilter() {
      @Override
      public Attributes transportReady(Attributes transportAttrs) {
        return super.transportReady(transportAttrs);
      }

      @Override
      public void transportTerminated(Attributes transportAttrs) {
        super.transportTerminated(transportAttrs);
      }
    };
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    final StudentGrpcServer server = new StudentGrpcServer();
    server.start();
    server.blockUntilShutdown();
  }
}

Client代碼

public class StudentGrpcClient {

  private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
  private final StudentServiceGrpc.StudentServiceStub stub;

  public StudentGrpcClient(Channel channel) {
    // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
    // shut it down.

    // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
    this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
    this.stub = StudentServiceGrpc.newStub(channel);
  }

  public void getRealNameByUsername(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      StudentResponse response = this.blockingStub.getRealNameByUsername(request);
      System.out.printf("Real name=%s\n", response.getRealName());
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernameLike(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {
      Iterator<StudentResponse> iterator = this.blockingStub.getRealNameByUsernameLike(request);
      iterator.forEachRemaining(r -> System.out.printf("Real name=%s\n", r.getRealName()));
    } catch (StatusRuntimeException e) {
      System.err.println(e.getMessage());
    }
  }

  public void getRealNameByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNameByUsernames(new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList responseList) {
              responseList.getStudentResponseList()
                  .forEach(r -> System.out.printf("Real name=%s\n", r.getRealName()));
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public void getRealNamesByUsernames(String username) {
    StudentUsernameRequest request = StudentUsernameRequest
        .newBuilder().setUsername(username).build();
    try {

      StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
          .getRealNamesByUsernames(new StreamObserver<StudentResponse>() {
            @Override
            public void onNext(StudentResponse response) {
              System.out.printf("Real name=%s\n", response.getRealName());
            }

            @Override
            public void onError(Throwable t) {
              t.printStackTrace();
            }

            @Override
            public void onCompleted() {
              System.out.println("getRealNameByUsernames completed");
            }
          });

      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);
      requestStreamObserver.onNext(request);

      requestStreamObserver.onCompleted();

    } catch (StatusRuntimeException e) {
      e.printStackTrace();
    }
  }

  public static void main(String[] args) throws InterruptedException {

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();

    try {

      StudentGrpcClient client = new StudentGrpcClient(channel);

      int count = 1;

      for (int i = 0; i < count; i++) {
        client.getRealNameByUsername("admin2018");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernameLike("admin2019");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNameByUsernames("admin2020");
        Thread.sleep(20);
        System.out.println("---");
        client.getRealNamesByUsernames("admin2021");
      }

      Thread.sleep(10000);

    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }
}

Client代碼(FutureStub)

僅適用於單請求單響應的簡單rpc調用:

try {

  ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
      StudentUsernameRequest.newBuilder().setUsername(username).build());

  // 阻塞等待
  // StudentResponse studentResponse = future.get();

  CountDownLatch latch = new CountDownLatch(1);

  Futures.addCallback(future, new FutureCallback<StudentResponse>() {
    @Override
    public void onSuccess(StudentResponse response) {
      System.out.printf("Real name=%s\n", response.getRealName());
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      System.err.println(t.getMessage());
      latch.countDown();
    }
  }, Executors.newSingleThreadExecutor());

  latch.await();

} catch (StatusRuntimeException | InterruptedException e) {
  System.err.println(e.getMessage());
}

Nacos中grpc的使用

在Nacos中,proto文件並沒有定義所有的介面,而是只定義了基礎的轉發介面和通用請求響應Payload結構體。

具體的介面請求響應結構體在業務代碼中編寫,業務介面則是使用轉發介面進行路由,類似SpringMVC中DispatcherServlet轉發請求給Controller一樣。

本小節將簡單介紹Nacos中集成grpc的方式。

服務端

BaseGrpcServer

抽象類BaseRpcServer定義了rpc伺服器的框架邏輯,模板方法startServer()要子類實現,是啟動rpc伺服器的核心邏輯。

抽象類BaseGrpcServer繼承了BaseRpcServer類,封裝了grpc組件:

  • Server - GRPC伺服器對象
  • GrpcRequestAcceptor - 業務請求接收、轉發器
  • GrpcBiStreamRequestAcceptor - 連接請求接收處理器,用於獲取雙向流發送StreamObserver
  • ConnectionManager - 連接管理器

startServer()方法封裝了啟動grpc伺服器的邏輯:

public void startServer() throws Exception {
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();

    // server interceptor to set connection id.
    ServerInterceptor serverInterceptor = new ServerInterceptor() {
        @Override
        public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                ServerCallHandler<T, S> next) {
            // 把connectionId、ip、port等保存到Context上
            Context ctx = Context.current()
                    .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                    .withValue(CONTEXT_KEY_CONN_REMOTE_PORT,
                               call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                    .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
            if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                Channel internalChannel = getInternalChannel(call);
                // 保存channel
                ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
            }
            return Contexts.interceptCall(ctx, call, headers, next);
        }
    };
    // 添加轉發組件
    addServices(handlerRegistry, serverInterceptor);

    // 創建Server
    server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
            .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            .addTransportFilter(new ServerTransportFilter() {
                @Override
                public Attributes transportReady(Attributes transportAttrs) {
                    // 在連接建立時獲取ip、port等信息,生成connectionId
                    InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                    InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                            .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                    int remotePort = remoteAddress.getPort();
                    int localPort = localAddress.getPort();
                    String remoteIp = remoteAddress.getAddress().getHostAddress();
                    Attributes attrWrapper = transportAttrs.toBuilder()
                            .set(TRANS_KEY_CONN_ID,
                                 System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                            .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                            .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                    return attrWrapper;
                    
                }

                @Override
                public void transportTerminated(Attributes transportAttrs) {
                    String connectionId = null;
                    try {
                        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                    } catch (Exception e) {
                        // Ignore
                    }
                    if (StringUtils.isNotBlank(connectionId)) {
                        // 連接斷開時,從connectionManager移除
                        connectionManager.unregister(connectionId);
                    }
                }
            }).build();

    server.start();
}

GrpcBiStreamRequestAcceptor

grpc bi stream request handler.

主要功能就是封裝服務端Connection對象:

if (parseObj instanceof ConnectionSetupRequest) {
    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
    Map<String, String> labels = setUpRequest.getLabels();
    String appName = "-";
    if (labels != null && labels.containsKey(Constants.APPNAME)) {
        appName = labels.get(Constants.APPNAME);
    }

    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
    metaInfo.setTenant(setUpRequest.getTenant());
    // 封裝連接基礎信息和responseObserver、channel
    // 使用responseObserver向客戶端推送消息
    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
    connection.setAbilities(setUpRequest.getAbilities());
    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
    // 註冊到connectionManager
    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
        try {
            connection.request(new ConnectResetRequest(), 3000L);
            connection.close();
        } catch (Exception e) {
            
        }
    }
}

GrpcRequestAcceptor

處理業務請求,將業務請求轉發到RequestHandler上:

public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {

    String type = grpcRequest.getMetadata().getType();

    // 查找RequestHandler處理器對象
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // no handler found
    if (requestHandler == null) {
        Payload payloadResponse = GrpcUtils
                .convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 請求體反序列化
    Object parseObj = null;
    try {
        parseObj = GrpcUtils.parse(grpcRequest);
    } catch (Exception e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }

    // 處理業務請求
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // 刷新客戶端活躍狀態
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        // 使用RequestHandler處理請求
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        // 響應
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Payload payloadResponse = GrpcUtils.convert(
            buildErrorResponse((e instanceof NacosException) ?
                               ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                               e.getMessage()));
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    }
    
}

客戶端

客戶端使用GrpcSdkClient類,連接服務端的邏輯在其父類GrpcClient的connectToServer方法中:

public Connection connectToServer(ServerInfo serverInfo) {
    try {
        if (grpcExecutor == null) {
            int threadNumber = ThreadUtils.getSuitableThreadCount(8);
            grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(10000),
                    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
                        "nacos-grpc-client-executor-%d").build());
            grpcExecutor.allowCoreThreadTimeOut(true);
        }
        int port = serverInfo.getServerPort() + rpcPortOffset();
        // 創建grpc客戶端stub
        RequestGrpc.RequestFutureStub newChannelStubTemp =
            createNewChannelStub(serverInfo.getServerIp(), port);
        if (newChannelStubTemp != null) {
            // 檢查服務端的可用性
            Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
            if (response == null || !(response instanceof ServerCheckResponse)) {
                shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                return null;
            }
            // 創建biStreamStub
            BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
                    .newStub(newChannelStubTemp.getChannel());
            // 創建connection
            GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
            grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());

            // create stream request and bind connection event to this connection.
            // 用於向服務端發送請求
            StreamObserver<Payload> payloadStreamObserver =
                bindRequestStream(biRequestStreamStub, grpcConn);

            // stream observer to send response to server
            grpcConn.setPayloadStreamObserver(payloadStreamObserver);
            grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
            grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
            // 發送一個ConnectionSetupRequest讓服務端創建Connection
            ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
            conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
            conSetupRequest.setLabels(super.getLabels());
            conSetupRequest.setAbilities(super.clientAbilities);
            conSetupRequest.setTenant(super.getTenant());
            grpcConn.sendRequest(conSetupRequest);
            return grpcConn;
        }
        return null;
    } catch (Exception e) {}
    return null;
}

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {
    return streamStub.requestBiStream(new StreamObserver<Payload>() {
        @Override
        public void onNext(Payload payload) {
            try {
                Object parseBody = GrpcUtils.parse(payload);
                final Request request = (Request) parseBody;
                if (request != null) {
                    try {
                        // 使用客戶端側的ServerRequestHandler處理服務端發送過來的數據
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            response.setRequestId(request.getRequestId());
                            // 響應
                            sendResponse(response);
                        }
                    } catch (Exception e) {
                        sendResponse(request.getRequestId(), false);
                    }
                }
            } catch (Exception e) {
                // ...
            }
        }

        @Override
        public void onError(Throwable throwable) {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            } else {
                // ...
            }
        }

        @Override
        public void onCompleted() {
            boolean isRunning = isRunning();
            boolean isAbandon = grpcConn.isAbandon();
            if (isRunning && !isAbandon) {
                if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                    switchServerAsync();
                }
            }
        }
    });
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 多線程是每個程式員的噩夢,用得好可以提升效率很爽,用得不好就是埋汰的火葬場。 這裡不深入介紹,主要是講解一些標準用法,熟讀唐詩三百首,不會作詩也會吟。 這裡就介紹一下springboot中的多線程的使用,使用線程連接池去非同步執行業務方法。 由於代碼中包含詳細註釋,也為了保持文章的整潔性,我就不 ...
  • 目錄前言必備理論知識:例子: 前言 有C#經驗,使用起來,駕輕就熟。 就是語法糖。但是也要熟悉用法,才好眾享絲滑。 內容參考: Chatjpt、文心一言 必備理論知識: 捕獲列表: []:預設不捕獲任何變數; [=]:預設以值捕獲所有變數;內部有一個相應的副本 [&]:預設以引用捕獲所有變數; [x ...
  • 變數 變數是用於存儲數據值的容器。 創建變數 Python沒有用於聲明變數的命令。 變數在您第一次為其分配值時被創建。 示例 x = 5 y = "John" print(x) print(y) 變數不需要聲明為特定類型,並且甚至在設置後可以更改類型。 示例 x = 4 # x的類型為int x = ...
  • 0 前言 一般初學者學習編碼和[錯誤處理]時,先知道[編程語言]有一種處理錯誤的形式或約定(如Java就拋異常),然後就開始用這些工具。但卻忽視這問題本質:處理錯誤是為了寫正確程式。可是 1 啥叫“正確”? 由解決的問題決定的。問題不同,解決方案不同。 如一個web介面接受用戶請求,參數age,也許 ...
  • 什麼是裝飾器,它們如何被使用,以及我們如何利用它們來構建代碼。我們將看到裝飾器是如何成為一個強大的工具,可以用來為我們的應用程式添加功能,並且可以在Python編程語言中找到。 裝飾器順序 在Python中,裝飾器是一個特殊的函數,可以修改另一個函數的行為。裝飾器是一種設計模式,它在不改變現有對象結 ...
  • dataclass 到 Python 中的 JSON JavaScript Object Notation或JSON表示使用編程語言中的文本組成的腳本(可執行)文件來存儲和傳輸數據。 Python通過JSON內置模塊支持JSON。因此,我們在Python腳本中導入JSON包,以利用這一能力。 JSO ...
  • 前言 很多時候,由於種種不可描述的原因,我們需要針對單個介面實現介面限流,防止訪問次數過於頻繁。這裡就用 redis+aop 實現一個限流介面註解 @RedisLimit 代碼 點擊查看RedisLimit註解代碼 import java.lang.annotation.*; /** * 功能:分佈 ...
  • 對重寫代碼說不。 以下為譯文: 1、重寫代碼消耗了12個月! 我們從頭開始重寫代碼浪費的時間。 你能想象在軟體行業,12個月的時間沒有任何新產品推出,沒有任何新版本更新嗎? 真的,我不由自主地問自己這個問題: 在這個快速發展的世界里,12月的時間能讓我們做多少事情? “2015年1月20日,星期二, ...
一周排行
    -Advertisement-
    Play Games
  • 最近做項目過程中,使用到了海康相機,官方只提供了C/C++的SDK,沒有搜尋到一個合適的封裝了的C#庫,故自己動手,簡單的封裝了一下,方便大家也方便自己使用和二次開發 ...
  • 前言 MediatR 是 .NET 下的一個實現消息傳遞的庫,輕量級、簡潔高效,用於實現進程內的消息傳遞機制。它基於中介者設計模式,支持請求/響應、命令、查詢、通知和事件等多種消息傳遞模式。通過泛型支持,MediatR 可以智能地調度不同類型的消息,非常適合用於領域事件處理。 在本文中,將通過一個簡 ...
  • 前言 今天給大家推薦一個超實用的開源項目《.NET 7 + Vue 許可權管理系統 小白快速上手》,DncZeus的願景就是做一個.NET 領域小白也能上手的簡易、通用的後臺許可權管理模板系統基礎框架。 不管你是技術小白還是技術大佬或者是不懂前端Vue 的新手,這個項目可以快速上手讓我們從0到1,搭建自 ...
  • 第1章:WPF概述 本章目標 瞭解Windows圖形演化 瞭解WPF高級API 瞭解解析度無關性概念 瞭解WPF體繫結構 瞭解WPF 4.5 WPF概述 ​ 歡迎使用 Windows Presentation Foundation (WPF) 桌面指南,這是一個與解析度無關的 UI 框架,使用基於矢 ...
  • 在日常開發中,並不是所有的功能都是用戶可見的,還在一些背後默默支持的程式,這些程式通常以服務的形式出現,統稱為輔助角色服務。今天以一個簡單的小例子,簡述基於.NET開發輔助角色服務的相關內容,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 第3章:佈局 本章目標 理解佈局的原則 理解佈局的過程 理解佈局的容器 掌握各類佈局容器的運用 理解 WPF 中的佈局 WPF 佈局原則 ​ WPF 視窗只能包含單個元素。為在WPF 視窗中放置多個元素並創建更貼近實用的用戶男面,需要在視窗上放置一個容器,然後在這個容器中添加其他元素。造成這一限制的 ...
  • 前言 在平時項目開發中,定時任務調度是一項重要的功能,廣泛應用於後臺作業、計劃任務和自動化腳本等模塊。 FreeScheduler 是一款輕量級且功能強大的定時任務調度庫,它支持臨時的延時任務和重覆迴圈任務(可持久化),能夠按秒、每天/每周/每月固定時間或自定義間隔執行(CRON 表達式)。 此外 ...
  • 目錄Blazor 組件基礎路由導航參數組件參數路由參數生命周期事件狀態更改組件事件 Blazor 組件 基礎 新建一個項目命名為 MyComponents ,項目模板的交互類型選 Auto ,其它保持預設選項: 客戶端組件 (Auto/WebAssembly): 最終解決方案裡面會有兩個項目:伺服器 ...
  • 先看一下效果吧: isChecked = false 的時候的效果 isChecked = true 的時候的效果 然後我們來實現一下這個效果吧 第一步:創建一個空的wpf項目; 第二步:在項目裡面添加一個checkbox <Grid> <CheckBox HorizontalAlignment=" ...
  • 在編寫上位機軟體時,需要經常處理命令拼接與其他設備進行通信,通常對不同的命令封裝成不同的方法,擴展稍許麻煩。 本次擬以特性方式實現,以兼顧維護性與擴展性。 思想: 一種命令對應一個類,其類中的各個屬性對應各個命令段,通過特性的方式,實現其在這包數據命令中的位置、大端或小端及其轉換為對應的目標類型; ...