Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。 grpc-java GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。 grp ...
Nacos 2.x版本增加了GRPC服務介面和客戶端,極大的提升了Nacos的性能,本文將簡單介紹grpc-java的使用方式以及Nacos中集成GRPC的方式。
grpc-java
GRPC是google開源的、以protobuf作為序列化方式、以http2作為通信協議的高性能rpc框架。
grpc-java是grpc對java語言的實現,使用Netty/Okhttp作為通信組件。
使用方式
添加依賴
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.56.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.56.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.56.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
生成代碼
需要將.proto文件放到src/main/proto或src/test/proto目錄下。
然後添加生成代碼使用的插件:
For protobuf-based codegen integrated with the Maven build system, you can use protobuf-maven-plugin (Eclipse and NetBeans users should also look at os-maven-plugin's IDE documentation):
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}</pluginArtifact>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<skip>false</skip>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
生成代碼:
mvn clean compile
High-level組件
At a high level there are three distinct layers to the library: Stub, Channel, and Transport.
Stub
The Stub layer is what is exposed to most developers and provides type-safe bindings to whatever datamodel/IDL/interface you are adapting. gRPC comes with a plugin to the protocol-buffers compiler that generates Stub interfaces out of .proto files, but bindings to other datamodel/IDL are easy and encouraged.
Channel
The Channel layer is an abstraction over Transport handling that is suitable for interception/decoration and exposes more behavior to the application than the Stub layer. It is intended to be easy for application frameworks to use this layer to address cross-cutting concerns such as logging, monitoring, auth, etc.
Transport
The Transport layer does the heavy lifting of putting and taking bytes off the wire. The interfaces to it are abstract just enough to allow plugging in of different implementations. Note the transport layer API is considered internal to gRPC and has weaker API guarantees than the core API under package io.grpc.
gRPC comes with multiple Transport implementations:
- The Netty-based HTTP/2 transport is the main transport implementation based on Netty. It is not officially supported on Android.
- The OkHttp-based HTTP/2 transport is a lightweight transport based on Okio and forked low-level parts of OkHttp. It is mainly for use on Android.
- The in-process transport is for when a server is in the same process as the client. It is used frequently for testing, while also being safe for production use.
- The Binder transport is for Android cross-process communication on a single device.
四種通信模式
-
簡單rpc - 一個請求一個響應
rpc getRealNameByUsername (StudentRequest) returns (StudentResponse) {}
-
服務端流式rpc - 服務端流式響應
rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
-
客戶端流式rpc - 客戶端流式請求
rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
-
雙向流rpc
rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
高級應用
- 攔截器
- Stream Tracer - 流攔截器
- Retry Policy - 客戶端重試
- NameResolver - 服務發現
- 負載均衡
- grpc與微服務:與dubbo、gateway、jwt、nacos2.x、openfeign
基礎示例
本小節將使用簡單的示例說明grpc-java的使用方法。
.proto文件
.proto文件需要放在src/main/proto目錄下麵:
syntax = "proto3";
package org.net5ijy.grpc.auto;
option java_package = "org.net5ijy.grpc.auto";
option java_outer_classname = "StudentRpc";
option java_multiple_files = true;
service StudentService {
rpc getRealNameByUsername (StudentUsernameRequest) returns (StudentResponse) {}
rpc getRealNameByUsernameLike (StudentUsernameRequest) returns (stream StudentResponse) {}
rpc getRealNameByUsernames (stream StudentUsernameRequest) returns (StudentResponseList) {}
rpc getRealNamesByUsernames (stream StudentUsernameRequest) returns (stream StudentResponse) {}
}
message StudentUsernameRequest {
string username = 1;
}
message StudentResponse {
string realName = 1;
}
message StudentResponseList {
repeated StudentResponse studentResponse = 1;
}
pom依賴
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.56.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.56.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.56.0</version>
</dependency>
pom插件
<build>
<finalName>${project.artifactId}</finalName>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<!-- mvn protobuf:compile -->
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.56.0:exe:${os.detected.classifier}
</pluginArtifact>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<skip>false</skip>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
編譯生成代碼
mvn clean compile
編寫業務實現類
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
@Override
public void getRealNameByUsername(StudentUsernameRequest request,
StreamObserver<StudentResponse> responseObserver) {
String username = request.getUsername();
System.out.printf("username=%s\n", username);
StudentResponse response = StudentResponse.newBuilder().setRealName("徐國峰").build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void getRealNameByUsernameLike(StudentUsernameRequest request,
StreamObserver<StudentResponse> responseObserver) {
String username = request.getUsername();
System.out.printf("username=%s\n", username);
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰1").build());
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰2").build());
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰3").build());
responseObserver.onNext(StudentResponse.newBuilder().setRealName("徐國峰4").build());
responseObserver.onCompleted();
}
@Override
public StreamObserver<StudentUsernameRequest> getRealNameByUsernames(
StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentUsernameRequest>() {
@Override
public void onNext(StudentUsernameRequest request) {
System.out.printf("username=%s\n", request.getUsername());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
StudentResponse response1 = StudentResponse.newBuilder().setRealName("徐國峰5").build();
StudentResponse response2 = StudentResponse.newBuilder().setRealName("徐國峰6").build();
StudentResponse response3 = StudentResponse.newBuilder().setRealName("徐國峰7").build();
StudentResponse response4 = StudentResponse.newBuilder().setRealName("徐國峰8").build();
StudentResponseList responseList = StudentResponseList.newBuilder()
.addStudentResponse(response1)
.addStudentResponse(response2)
.addStudentResponse(response3)
.addStudentResponse(response4)
.build();
responseObserver.onNext(responseList);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<StudentUsernameRequest> getRealNamesByUsernames(
StreamObserver<StudentResponse> responseObserver) {
return new StreamObserver<StudentUsernameRequest>() {
@Override
public void onNext(StudentUsernameRequest request) {
System.out.printf("username=%s\n", request.getUsername());
StudentResponse response = StudentResponse.newBuilder()
.setRealName("徐國峰" + new Random().nextInt(10)).build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
Server代碼
public class StudentGrpcServer {
private static final AtomicInteger COUNT = new AtomicInteger(0);
static final int GRPC_SERVER_PORT = 50051;
private Server server;
private void start() throws IOException {
// grpc server executor
Executor executor = new ThreadPoolExecutor(8, 16, 120, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> {
Thread t = new Thread(r);
t.setName("stu-grpc-server-" + COUNT.incrementAndGet());
return t;
});
/* The port on which the server should run */
this.server = ServerBuilder.forPort(GRPC_SERVER_PORT).executor(executor)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addService(new StudentServiceImpl())
.intercept(serverInterceptor())
.addTransportFilter(serverTransportFilter())
.build();
this.server.start();
System.out.println("Server started, listening on " + GRPC_SERVER_PORT);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
StudentGrpcServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("*** server shut down");
}));
}
private void stop() throws InterruptedException {
if (this.server != null && !this.server.isShutdown()) {
this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (this.server != null) {
this.server.awaitTermination();
}
}
private ServerInterceptor serverInterceptor() {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
Context ctx = Context.current();
return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
}
};
}
private ServerTransportFilter serverTransportFilter() {
return new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
return super.transportReady(transportAttrs);
}
@Override
public void transportTerminated(Attributes transportAttrs) {
super.transportTerminated(transportAttrs);
}
};
}
public static void main(String[] args) throws IOException, InterruptedException {
final StudentGrpcServer server = new StudentGrpcServer();
server.start();
server.blockUntilShutdown();
}
}
Client代碼
public class StudentGrpcClient {
private final StudentServiceGrpc.StudentServiceBlockingStub blockingStub;
private final StudentServiceGrpc.StudentServiceStub stub;
public StudentGrpcClient(Channel channel) {
// 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
// shut it down.
// Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
this.blockingStub = StudentServiceGrpc.newBlockingStub(channel);
this.stub = StudentServiceGrpc.newStub(channel);
}
public void getRealNameByUsername(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
StudentResponse response = this.blockingStub.getRealNameByUsername(request);
System.out.printf("Real name=%s\n", response.getRealName());
} catch (StatusRuntimeException e) {
System.err.println(e.getMessage());
}
}
public void getRealNameByUsernameLike(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
Iterator<StudentResponse> iterator = this.blockingStub.getRealNameByUsernameLike(request);
iterator.forEachRemaining(r -> System.out.printf("Real name=%s\n", r.getRealName()));
} catch (StatusRuntimeException e) {
System.err.println(e.getMessage());
}
}
public void getRealNameByUsernames(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
.getRealNameByUsernames(new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList responseList) {
responseList.getStudentResponseList()
.forEach(r -> System.out.printf("Real name=%s\n", r.getRealName()));
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("getRealNameByUsernames completed");
}
});
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onCompleted();
} catch (StatusRuntimeException e) {
e.printStackTrace();
}
}
public void getRealNamesByUsernames(String username) {
StudentUsernameRequest request = StudentUsernameRequest
.newBuilder().setUsername(username).build();
try {
StreamObserver<StudentUsernameRequest> requestStreamObserver = this.stub
.getRealNamesByUsernames(new StreamObserver<StudentResponse>() {
@Override
public void onNext(StudentResponse response) {
System.out.printf("Real name=%s\n", response.getRealName());
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("getRealNameByUsernames completed");
}
});
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onNext(request);
requestStreamObserver.onCompleted();
} catch (StatusRuntimeException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", GRPC_SERVER_PORT).usePlaintext().build();
try {
StudentGrpcClient client = new StudentGrpcClient(channel);
int count = 1;
for (int i = 0; i < count; i++) {
client.getRealNameByUsername("admin2018");
Thread.sleep(20);
System.out.println("---");
client.getRealNameByUsernameLike("admin2019");
Thread.sleep(20);
System.out.println("---");
client.getRealNameByUsernames("admin2020");
Thread.sleep(20);
System.out.println("---");
client.getRealNamesByUsernames("admin2021");
}
Thread.sleep(10000);
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}
Client代碼(FutureStub)
僅適用於單請求單響應的簡單rpc調用:
try {
ListenableFuture<StudentResponse> future = futureStub.getRealNameByUsername(
StudentUsernameRequest.newBuilder().setUsername(username).build());
// 阻塞等待
// StudentResponse studentResponse = future.get();
CountDownLatch latch = new CountDownLatch(1);
Futures.addCallback(future, new FutureCallback<StudentResponse>() {
@Override
public void onSuccess(StudentResponse response) {
System.out.printf("Real name=%s\n", response.getRealName());
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
System.err.println(t.getMessage());
latch.countDown();
}
}, Executors.newSingleThreadExecutor());
latch.await();
} catch (StatusRuntimeException | InterruptedException e) {
System.err.println(e.getMessage());
}
Nacos中grpc的使用
在Nacos中,proto文件並沒有定義所有的介面,而是只定義了基礎的轉發介面和通用請求響應Payload結構體。
具體的介面請求響應結構體在業務代碼中編寫,業務介面則是使用轉發介面進行路由,類似SpringMVC中DispatcherServlet轉發請求給Controller一樣。
本小節將簡單介紹Nacos中集成grpc的方式。
服務端
BaseGrpcServer
抽象類BaseRpcServer定義了rpc伺服器的框架邏輯,模板方法startServer()要子類實現,是啟動rpc伺服器的核心邏輯。
抽象類BaseGrpcServer繼承了BaseRpcServer類,封裝了grpc組件:
- Server - GRPC伺服器對象
- GrpcRequestAcceptor - 業務請求接收、轉發器
- GrpcBiStreamRequestAcceptor - 連接請求接收處理器,用於獲取雙向流發送StreamObserver
- ConnectionManager - 連接管理器
startServer()方法封裝了啟動grpc伺服器的邏輯:
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// server interceptor to set connection id.
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) {
// 把connectionId、ip、port等保存到Context上
Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
.withValue(CONTEXT_KEY_CONN_REMOTE_PORT,
call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
Channel internalChannel = getInternalChannel(call);
// 保存channel
ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
}
return Contexts.interceptCall(ctx, call, headers, next);
}
};
// 添加轉發組件
addServices(handlerRegistry, serverInterceptor);
// 創建Server
server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
.maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
// 在連接建立時獲取ip、port等信息,生成connectionId
InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
int remotePort = remoteAddress.getPort();
int localPort = localAddress.getPort();
String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID,
System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build();
return attrWrapper;
}
@Override
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
// 連接斷開時,從connectionManager移除
connectionManager.unregister(connectionId);
}
}
}).build();
server.start();
}
GrpcBiStreamRequestAcceptor
grpc bi stream request handler.
主要功能就是封裝服務端Connection對象:
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
Map<String, String> labels = setUpRequest.getLabels();
String appName = "-";
if (labels != null && labels.containsKey(Constants.APPNAME)) {
appName = labels.get(Constants.APPNAME);
}
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
// 封裝連接基礎信息和responseObserver、channel
// 使用responseObserver向客戶端推送消息
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities());
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
// 註冊到connectionManager
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
try {
connection.request(new ConnectResetRequest(), 3000L);
connection.close();
} catch (Exception e) {
}
}
}
GrpcRequestAcceptor
處理業務請求,將業務請求轉發到RequestHandler上:
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
String type = grpcRequest.getMetadata().getType();
// 查找RequestHandler處理器對象
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
// no handler found
if (requestHandler == null) {
Payload payloadResponse = GrpcUtils
.convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 請求體反序列化
Object parseObj = null;
try {
parseObj = GrpcUtils.parse(grpcRequest);
} catch (Exception e) {
Payload payloadResponse = GrpcUtils.convert(
buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 處理業務請求
Request request = (Request) parseObj;
try {
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新客戶端活躍狀態
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 使用RequestHandler處理請求
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
// 響應
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
Payload payloadResponse = GrpcUtils.convert(
buildErrorResponse((e instanceof NacosException) ?
((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
e.getMessage()));
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
}
客戶端
客戶端使用GrpcSdkClient類,連接服務端的邏輯在其父類GrpcClient的connectToServer方法中:
public Connection connectToServer(ServerInfo serverInfo) {
try {
if (grpcExecutor == null) {
int threadNumber = ThreadUtils.getSuitableThreadCount(8);
grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(
"nacos-grpc-client-executor-%d").build());
grpcExecutor.allowCoreThreadTimeOut(true);
}
int port = serverInfo.getServerPort() + rpcPortOffset();
// 創建grpc客戶端stub
RequestGrpc.RequestFutureStub newChannelStubTemp =
createNewChannelStub(serverInfo.getServerIp(), port);
if (newChannelStubTemp != null) {
// 檢查服務端的可用性
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
return null;
}
// 創建biStreamStub
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
// 創建connection
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
// create stream request and bind connection event to this connection.
// 用於向服務端發送請求
StreamObserver<Payload> payloadStreamObserver =
bindRequestStream(biRequestStreamStub, grpcConn);
// stream observer to send response to server
grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
// 發送一個ConnectionSetupRequest讓服務端創建Connection
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
return grpcConn;
}
return null;
} catch (Exception e) {}
return null;
}
private StreamObserver<Payload> bindRequestStream(
final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 使用客戶端側的ServerRequestHandler處理服務端發送過來的數據
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
// 響應
sendResponse(response);
}
} catch (Exception e) {
sendResponse(request.getRequestId(), false);
}
}
} catch (Exception e) {
// ...
}
}
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
// ...
}
}
@Override
public void onCompleted() {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
}
}
});
}