本文將從一個服務註冊示例入手,通過閱讀客戶端、服務端源碼,分析服務註冊、服務發現原理。 使用的2.0.2的版本。 # 客戶端 ## 創建NacosNamingService對象 ```java NacosNamingService nacosNamingService = new NacosNami ...
本文將從一個服務註冊示例入手,通過閱讀客戶端、服務端源碼,分析服務註冊、服務發現原理。
使用的2.0.2的版本。
客戶端
創建NacosNamingService對象
NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
NacosNamingService提供兩個構造方法:
public NacosNamingService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
init(properties);
}
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
第二個方法的properties的key在PropertyKeyConst常量類可以找到,如:
- namespace
- username
- password
- serverAddr
- clusterName
- 其他
構造方法中會初始化一些參數和組件:
-
初始化namespace參數
-
創建InstancesChangeNotifier對象,它實現了Subscriber介面,監聽InstancesChangeEvent事件
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> { // key使用serviceName + groupName + clusters組合而成 // value是監聽器集合 private final Map<String, ConcurrentHashSet<EventListener>> listenerMap; // 鎖 private final Object lock = new Object();
-
向NotifyCenter註冊InstancesChangeEvent事件,註冊之前創建的InstancesChangeNotifier對象監聽服務實例變化
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(changeNotifier); // NotifyCenter維護著EventPublisher集,Subscriber會被註冊到EventPublisher上 // EventPublisher提供publish方法向Event隊列推送事件 // EventPublisher是一個Thread類,run方法從Event隊列取事件通知Subscriber來處理
-
創建NamingClientProxyDelegate對象,用於與服務端通信,它是一個代理,內部使用其他的NamingClientProxy實現:
- NamingHttpClientProxy
- NamingGrpcClientProxy - 預設使用該實現類,其中有healthCheck檢測服務端是否健康,服務端直接響應成功無操作
服務註冊
NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
nacosNamingService.registerInstance(ORDER_SERVICE, "192.168.0.100", 9999);
提供多個重載的registerInstance方法,最終使用這個方法:
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException {
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
registerInstance(serviceName, groupName, instance);
}
public void registerInstance(String serviceName, String groupName, Instance instance)
throws NacosException {
// 此處clientProxy是NamingClientProxyDelegate對象
clientProxy.registerService(serviceName, groupName, instance);
}
NamingClientProxyDelegate的registerService方法會選擇一個具體的NamingClientProxy對象與服務端通信,預設使用NamingGrpcClientProxy對象。
NamingGrpcClientProxy的registerService方法構建InstanceRequest請求對象,之後使用RpcClient對象發送請求並接收響應。
RpcClient內部通過GrpcConnection對象使用GRPC來訪問服務端。
內部的GRPC代碼是使用protoc和protobuf-maven-plugin生成的,通信細節此處不做介紹。
服務下線
nacosNamingService.deregisterInstance(ORDER_SERVICE, "192.168.0.100", 9999);
deregisterInstance服務下線:
public void deregisterInstance(String serviceName,
String groupName,
String ip,
int port,
String clusterName) throws NacosException {
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);
deregisterInstance(serviceName, groupName, instance);
}
public void deregisterInstance(String serviceName,
String groupName,
Instance instance) throws NacosException {
clientProxy.deregisterService(serviceName, groupName, instance);
}
查詢實例
示例代碼:
NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);
System.out.printf(">> instance count=%d\n", instances.size());
for (Instance instance : instances) {
System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
instance.getServiceName(), instance.getInstanceId(),
instance.getClusterName(), instance.getIp(), instance.getPort());
}
提供了幾個重載的getAllInstances方法,最重要的參數就是subscribe,當為true時,會向服務端發送訂閱請求,之後一直從ServiceInfoHolder中獲取服務實例信息,而不再向服務端發送查詢請求。
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
// 訂閱請求
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 查詢請求
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
服務訂閱
示例代碼:
NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
namingService.subscribe(ORDER_SERVICE, new EventListener() {
@Override
public void onEvent(Event event) {
NamingEvent e = (NamingEvent) event;
System.out.println("serviceName=" + e.getServiceName());
List<Instance> instances = e.getInstances();
System.out.printf(">> instance count=%d\n", instances.size());
for (Instance instance : instances) {
System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
instance.getServiceName(), instance.getInstanceId(),
instance.getClusterName(), instance.getIp(), instance.getPort());
}
}
});
TimeUnit.SECONDS.sleep(1200);
subscribe方法:
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
String clusterString = StringUtils.join(clusters, ",");
// 將listener保存到listenerMap中
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
// 發送訂閱請求
clientProxy.subscribe(serviceName, groupName, clusterString);
}
實例變化的方法調用棧:
當收到服務端的實例變化事件時,會觸發grpc層的觀察者監聽:
public void onMessage(RespT message) {
if (firstResponseReceived && !streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
// 調用觀察者
observer.onNext(message);
if (streamingResponse && adapter.autoFlowControlEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
}
此處的observer是在創建rpc連接的時候註冊的:
private StreamObserver<Payload> bindRequestStream(
final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 調用ServerRequestHandler處理請求
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
sendResponse(response);
}
// ...
NamingPushRequestHandler的處理邏輯:
public Response requestReply(Request request) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
return new NotifySubscriberResponse();
}
return null;
}
serviceInfoHolder.processServiceInfo方法:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
// 推送一個InstancesChangeEvent事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
推送一個InstancesChangeEvent事件:
-
NotifyCenter維護著一個EventPublisher集,當有事件時,會選擇一個目標EventPublisher
-
通過publish方法將事件保存到一個Event隊列
public boolean publish(Event event) { checkIsStart(); boolean success = this.queue.offer(event); if (!success) { // 當隊列操作失敗時,直接使用當前線程處理事件 receiveEvent(event); return true; } return true; }
-
EventPublisher是一個線程,在NotifyCenter初始化時啟動。run方法會從Event隊列取事件,使用receiveEvent(event)進行處理
-
receiveEvent方法查找所有的Subscriber,其中就有最初創建的InstancesChangeNotifier,調用訂閱者onEvent方法
服務端
服務註冊
InstanceRequestHandler處理器
註冊中心的rpc處理器在com.alibaba.nacos.naming.remote.rpc.handler包,處理服務註冊和下線的處理器是InstanceRequestHandler類:
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
private final EphemeralClientOperationServiceImpl clientOperationService;
public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
this.clientOperationService = clientOperationService;
}
@Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType()) {
// 服務註冊
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
// 服務下線
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
// 服務註冊
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
// 服務下線
private InstanceResponse deregisterInstance(
Service service, InstanceRequest request, RequestMeta meta) {
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
}
}
服務註冊核心流程
public void registerInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
InstancePublishInfo instanceInfo = getPublishInfo(instance);
// Add a new instance for service for current client
// 1. 給當前客戶端綁定service -> instance關係
// 2. 推送一個ClientChangedEvent事件
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
// 推送ClientRegisterServiceEvent和InstanceMetadataEvent事件
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
- 給當前客戶端綁定service -> instance關係
- 推送一個ClientChangedEvent事件
- 推送ClientRegisterServiceEvent事件
- 推送InstanceMetadataEvent事件
事件處理流程
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 會由DistroClientDataProcessor進行處理,同步客戶端數據到所有服務節點。
ClientRegisterServiceEvent事件:Client register service event. 由ClientServiceIndexesManager進行處理,ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。另外該處理器會推送一個ServiceChangedEvent事件。
InstanceMetadataEvent事件:實例元數據事件。由NamingMetadataManager進行處理,NamingMetadataManager管理客戶端註冊的服務和實例元數據信息。InstanceMetadataEvent事件會觸發該處理器的實例過期判斷。
ServiceChangedEvent事件:Service data changed event. 有兩個處理器:
- NamingSubscriberServiceV2Impl - 觸發回調服務訂閱者任務
- DoubleWriteEventListener - 觸發將服務信息同步到其他nacos節點任務
服務下線
服務下線核心流程
public void deregisterInstance(Service service, Instance instance, String clientId) {
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
// Remove service instance from client
// 1. 解除當前客戶端的service -> instance關係
// 2. 推送一個ClientChangedEvent事件
InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
client.setLastUpdatedTime();
// 推送ClientDeregisterServiceEvent和InstanceMetadataEvent事件
if (null != removedInstance) {
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(
new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
}
}
- 解除當前客戶端的service -> instance關係
- 推送一個ClientChangedEvent事件
- 推送ClientDeregisterServiceEvent事件
- 推送InstanceMetadataEvent事件
事件處理流程
基本與服務註冊流程相同。
ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 會由DistroClientDataProcessor進行處理,同步客戶端數據到所有服務節點。
ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager進行處理,ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。另外該處理器會推送一個ServiceChangedEvent事件。
InstanceMetadataEvent事件:實例元數據事件。由NamingMetadataManager進行處理,NamingMetadataManager管理客戶端註冊的服務和實例元數據信息。InstanceMetadataEvent事件會觸發該處理器的實例過期判斷。
ServiceChangedEvent事件:Service data changed event. 有兩個處理器:
- NamingSubscriberServiceV2Impl - 觸發回調服務訂閱者任務
- DoubleWriteEventListener - 觸發將服務信息同步到其他nacos節點任務
服務實例心跳
- 客戶端會周期性的發送healthCheck請求
- 服務端每次收到客戶端請求時都會更新對應connection的活躍時間戳
- 服務端也會周期性的檢查客戶端connection的活躍時間戳和客戶端IP連接數,當超過一定的時間不活躍,服務端會發一個檢測請求給客戶端,當連接數超過閾值時將重置多餘的連接
客戶端healthCheck請求
客戶端會周期性發送healthCheck請求,預設每5秒執行一次,在RpcClient中:
clientEventExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
//check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
boolean isHealthy = healthCheck();
if (!isHealthy) {
if (currentConnection == null) {
continue;
}
RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
break;
}
// ...
healthCheck健康檢查:
private boolean healthCheck() {
HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
return response != null && response.isSuccess();
} catch (NacosException e) {
//ignore
}
return false;
}
如果檢查失敗,將重新建立連接。
服務端記錄connection活躍時間戳
服務端每次收到客戶端請求時都會更新對應connection的活躍時間戳。
服務端使用GrpcRequestAcceptor作為業務層請求Acceptor入口,這個類會將GRPC的請求轉為業務層請求,並轉發到對應的RequestHandler處理器。
在其request方法中,會刷新對應connection的活躍時間戳:
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新connection的活躍時間戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
服務端connection活躍檢查
服務端周期性檢查客戶端connection的活躍時間戳和客戶端IP連接數,當超過一定的時間不活躍,服務端會發一個檢測請求給客戶端,當連接數超過閾值時將重置多餘的連接。
服務端使用ConnectionManager管理連接:
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
在啟動時,會創建周期性任務檢查connections的活躍狀態,預設每3秒執行一次,以下為代碼片段:
// 檢查長時間不活躍的連接和超過最大連接數的連接
for (Map.Entry<String, Connection> entry : entries) {
Connection client = entry.getValue();
String clientIp = client.getMetaInfo().getClientIp();
AtomicInteger integer = expelForIp.get(clientIp);
if (integer != null && integer.intValue() > 0) {
integer.decrementAndGet();
expelClient.add(client.getMetaInfo().getConnectionId());
expelCount--;
} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
outDatedConnections.add(client.getMetaInfo().getConnectionId());
}
}
// ...
// 重置超過最大連接數的連接
for (String expelledClientId : expelClient) {
try {
Connection connection = getConnection(expelledClientId);
if (connection != null) {
ConnectResetRequest connectResetRequest = new ConnectResetRequest();
connectResetRequest.setServerIp(serverIp);
connectResetRequest.setServerPort(serverPort);
connection.asyncRequest(connectResetRequest, null);
}
} catch (ConnectionAlreadyClosedException e) {
unregister(expelledClientId);
} catch (Exception e) {
}
}
// ...
if (CollectionUtils.isNotEmpty(outDatedConnections)) {
Set<String> successConnections = new HashSet<>();
final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
for (String outDateConnectionId : outDatedConnections) {
try {
Connection connection = getConnection(outDateConnectionId);
if (connection != null) {
// 給客戶端發檢測請求
ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public long getTimeout() {
return 1000L;
}
@Override
public void onResponse(Response response) {
latch.countDown();
if (response != null && response.isSuccess()) {
connection.freshActiveTime();
successConnections.add(outDateConnectionId);
}
}
@Override
public void onException(Throwable e) {
latch.countDown();
}
});
} else {
latch.countDown();
}
} catch (ConnectionAlreadyClosedException e) {
latch.countDown();
} catch (Exception e) {
latch.countDown();
}
}
latch.await(3000L, TimeUnit.MILLISECONDS);
// 移除失敗的已斷開連接
for (String outDateConnectionId : outDatedConnections) {
if (!successConnections.contains(outDateConnectionId)) {
unregister(outDateConnectionId);
}
}
}
客戶端斷開連接
業務處理流程
GRPC連接層檢測到連接斷開之後,會觸發GrpcServer的transportTerminated事件:
public void transportTerminated(Attributes transportAttrs) {
String connectionId = null;
try {
connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
} catch (Exception e) {
// Ignore
}
if (StringUtils.isNotBlank(connectionId)) {
// 使用ConnectionManager移除連接
connectionManager.unregister(connectionId);
}
}
ConnectionManager移除連接:
public synchronized void unregister(String connectionId) {
// 從Connection集移除連接
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
// IP連接數--
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
// 通知ClientManager層斷開連接
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
ConnectionBasedClientManager的clientDisconnected方法:
public boolean clientDisconnected(String clientId) {
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
// 推送一個ClientDisconnectEvent事件
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
事件處理流程
ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.
- ClientServiceIndexesManager - 維護註冊和訂閱關係
- DistroClientDataProcessor - 同步客戶端數據到所有服務節點
- NamingMetadataManager - 維護客戶端註冊的服務和實例元數據信息
查詢實例
ServiceQueryRequestHandler處理器
ServiceQueryRequestHandler類負責客戶端的服務實例查詢請求:
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
private final ServiceStorage serviceStorage;
private final NamingMetadataManager metadataManager;
public ServiceQueryRequestHandler(ServiceStorage serviceStorage,
NamingMetadataManager metadataManager) {
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
}
@Override
@Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
public QueryServiceResponse handle(
ServiceQueryRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String groupName = request.getGroupName();
String serviceName = request.getServiceName();
Service service = Service.newService(namespaceId, groupName, serviceName);
String cluster = null == request.getCluster() ? "" : request.getCluster();
boolean healthyOnly = request.isHealthyOnly();
// ServiceInfo封裝服務基本信息和其實例集合
ServiceInfo result = serviceStorage.getData(service);
ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
result = ServiceUtil
.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
return QueryServiceResponse.buildSuccessResponse(result);
}
}
查詢服務實例:
public ServiceInfo getData(Service service) {
// 如果緩存裡面有服務信息則直接從緩存查找
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}
public ServiceInfo getPushData(Service service) {
ServiceInfo result = emptyServiceInfo(service);
if (!ServiceManager.getInstance().containSingleton(service)) {
return result;
}
// 從ClientServiceIndexesManager查找
result.setHosts(getAllInstancesFromIndex(service));
serviceDataIndexes.put(service, result);
return result;
}
private List<Instance> getAllInstancesFromIndex(Service service) {
Set<Instance> result = new HashSet<>();
Set<String> clusters = new HashSet<>();
// 從ClientServiceIndexesManager查找service綁定的client集
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
// 查找該client註冊的實例信息
Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
if (instancePublishInfo.isPresent()) {
Instance instance = parseInstance(service, instancePublishInfo.get());
result.add(instance);
clusters.add(instance.getClusterName());
}
}
// cache clusters of this service
serviceClusterIndex.put(service, clusters);
return new LinkedList<>(result);
}
private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
// 獲取到client對象
Client client = clientManager.getClient(clientId);
if (null == client) {
return Optional.empty();
}
// 查找該client指定service註冊的實例信息
// AbstractClient使用Map<Service, InstancePublishInfo>結構保存
// 前文介紹過在服務註冊時會使用client.addServiceInstance方法添加註冊信息
return Optional.ofNullable(client.getInstancePublishInfo(service));
}
前文介紹過ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。
服務訂閱
SubscribeServiceRequestHandler處理器
SubscribeServiceRequestHandler類負責客戶端的服務訂閱請求:
public class SubscribeServiceRequestHandler extends
RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
private final ServiceStorage serviceStorage;
private final NamingMetadataManager metadataManager;
private final EphemeralClientOperationServiceImpl clientOperationService;
public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
NamingMetadataManager metadataManager,
EphemeralClientOperationServiceImpl clientOperationService) {
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
this.clientOperationService = clientOperationService;
}
@Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
public SubscribeServiceResponse handle(
SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
String namespaceId = request.getNamespace();
String serviceName = request.getServiceName();
String groupName = request.getGroupName();
String app = request.getHeader("app", "unknown");
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
Service service = Service.newService(namespaceId, groupName, serviceName, true);
// 封裝Subscriber對象:客戶端IP、版本、命名空間等
Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app,
meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
metadataManager.getServiceMetadata(service).orElse(null),
subscriber);
if (request.isSubscribe()) {
// 服務訂閱
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
} else {
// 取消訂閱
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
}
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
private ServiceInfo handleClusterData(
ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
return StringUtils.isBlank(subscriber.getCluster()) ? data
: ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
}
}
服務訂閱核心流程
public void subscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
// 為該client綁定service -> subscriber關係
client.addServiceSubscriber(singleton, subscriber);
client.setLastUpdatedTime();
// 推送一個ClientSubscribeServiceEvent事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
事件處理流程
ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServiceIndexesManager進行處理,ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
// 推送一個ServiceSubscribedEvent事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl進行處理。
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers.
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(
service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// 觸發一次訂閱者回調,把被訂閱的服務的信息推送給訂閱者
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(
service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
取消服務訂閱
public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
Client client = clientManager.getClient(clientId);
client.removeServiceSubscriber(singleton);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}
推送一個ClientUnsubscribeServiceEvent事件,還是使用ClientServiceIndexesManager來處理,移除訂閱關係。