Nacos源碼 (3) 註冊中心

来源:https://www.cnblogs.com/xugf/archive/2023/08/14/17627742.html
-Advertisement-
Play Games

本文將從一個服務註冊示例入手,通過閱讀客戶端、服務端源碼,分析服務註冊、服務發現原理。 使用的2.0.2的版本。 # 客戶端 ## 創建NacosNamingService對象 ```java NacosNamingService nacosNamingService = new NacosNami ...


本文將從一個服務註冊示例入手,通過閱讀客戶端、服務端源碼,分析服務註冊、服務發現原理。

使用的2.0.2的版本。

客戶端

創建NacosNamingService對象

NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);

NacosNamingService提供兩個構造方法:

public NacosNamingService(String serverList) throws NacosException {
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
    init(properties);
}

public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

第二個方法的properties的key在PropertyKeyConst常量類可以找到,如:

  • namespace
  • username
  • password
  • serverAddr
  • clusterName
  • 其他

構造方法中會初始化一些參數和組件:

  • 初始化namespace參數

  • 創建InstancesChangeNotifier對象,它實現了Subscriber介面,監聽InstancesChangeEvent事件

    public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    
        // key使用serviceName + groupName + clusters組合而成
        // value是監聽器集合
        private final Map<String, ConcurrentHashSet<EventListener>> listenerMap;
    
        // 鎖
        private final Object lock = new Object();
    
  • 向NotifyCenter註冊InstancesChangeEvent事件,註冊之前創建的InstancesChangeNotifier對象監聽服務實例變化

    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    
    // NotifyCenter維護著EventPublisher集,Subscriber會被註冊到EventPublisher上
    // EventPublisher提供publish方法向Event隊列推送事件
    // EventPublisher是一個Thread類,run方法從Event隊列取事件通知Subscriber來處理
    
  • 創建NamingClientProxyDelegate對象,用於與服務端通信,它是一個代理,內部使用其他的NamingClientProxy實現:

    • NamingHttpClientProxy
    • NamingGrpcClientProxy - 預設使用該實現類,其中有healthCheck檢測服務端是否健康,服務端直接響應成功無操作

服務註冊

NacosNamingService nacosNamingService = new NacosNamingService(NACOS_HOST);
nacosNamingService.registerInstance(ORDER_SERVICE, "192.168.0.100", 9999);

提供多個重載的registerInstance方法,最終使用這個方法:

public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException {
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setWeight(1.0);
    instance.setClusterName(clusterName);
    registerInstance(serviceName, groupName, instance);
}

public void registerInstance(String serviceName, String groupName, Instance instance)
            throws NacosException {
    // 此處clientProxy是NamingClientProxyDelegate對象
    clientProxy.registerService(serviceName, groupName, instance);
}

NamingClientProxyDelegate的registerService方法會選擇一個具體的NamingClientProxy對象與服務端通信,預設使用NamingGrpcClientProxy對象。

NamingGrpcClientProxy的registerService方法構建InstanceRequest請求對象,之後使用RpcClient對象發送請求並接收響應。

RpcClient內部通過GrpcConnection對象使用GRPC來訪問服務端。

內部的GRPC代碼是使用protoc和protobuf-maven-plugin生成的,通信細節此處不做介紹。

服務下線

nacosNamingService.deregisterInstance(ORDER_SERVICE, "192.168.0.100", 9999);

deregisterInstance服務下線:

public void deregisterInstance(String serviceName,
                               String groupName,
                               String ip,
                               int port,
                               String clusterName) throws NacosException {
    Instance instance = new Instance();
    instance.setIp(ip);
    instance.setPort(port);
    instance.setClusterName(clusterName);
    deregisterInstance(serviceName, groupName, instance);
}

public void deregisterInstance(String serviceName,
                               String groupName,
                               Instance instance) throws NacosException {
    clientProxy.deregisterService(serviceName, groupName, instance);
}

查詢實例

示例代碼:

NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
List<Instance> instances = namingService.getAllInstances(ORDER_SERVICE, true);

System.out.printf(">> instance count=%d\n", instances.size());

for (Instance instance : instances) {
    System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
            instance.getServiceName(), instance.getInstanceId(),
            instance.getClusterName(), instance.getIp(), instance.getPort());
}

提供了幾個重載的getAllInstances方法,最重要的參數就是subscribe,當為true時,會向服務端發送訂閱請求,之後一直從ServiceInfoHolder中獲取服務實例信息,而不再向服務端發送查詢請求。

public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    if (subscribe) {
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
        if (null == serviceInfo) {
            // 訂閱請求
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 查詢請求
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

服務訂閱

示例代碼:

NacosNamingService namingService = new NacosNamingService(NACOS_HOST);
namingService.subscribe(ORDER_SERVICE, new EventListener() {
    @Override
    public void onEvent(Event event) {
        NamingEvent e = (NamingEvent) event;
        System.out.println("serviceName=" + e.getServiceName());
        List<Instance> instances = e.getInstances();
        System.out.printf(">> instance count=%d\n", instances.size());

        for (Instance instance : instances) {
            System.out.printf(">> serviceName=%s, id=%s, cluster=%s, ip=%s, port=%s\n",
                    instance.getServiceName(), instance.getInstanceId(),
                    instance.getClusterName(), instance.getIp(), instance.getPort());
        }
    }
});

TimeUnit.SECONDS.sleep(1200);

subscribe方法:

public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    String clusterString = StringUtils.join(clusters, ",");
    // 將listener保存到listenerMap中
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    // 發送訂閱請求
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

實例變化的方法調用棧:

當收到服務端的實例變化事件時,會觸發grpc層的觀察者監聽:

public void onMessage(RespT message) {
  if (firstResponseReceived && !streamingResponse) {
    throw Status.INTERNAL
        .withDescription("More than one responses received for unary or client-streaming call")
        .asRuntimeException();
  }
  firstResponseReceived = true;
  // 調用觀察者
  observer.onNext(message);

  if (streamingResponse && adapter.autoFlowControlEnabled) {
    // Request delivery of the next inbound message.
    adapter.request(1);
  }
}

此處的observer是在創建rpc連接的時候註冊的:

private StreamObserver<Payload> bindRequestStream(
        final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
        final GrpcConnection grpcConn) {

    return streamStub.requestBiStream(new StreamObserver<Payload>() {

        @Override
        public void onNext(Payload payload) {
            try {
                Object parseBody = GrpcUtils.parse(payload);
                final Request request = (Request) parseBody;
                if (request != null) {
                    try {
                        // 調用ServerRequestHandler處理請求
                        Response response = handleServerRequest(request);
                        if (response != null) {
                            response.setRequestId(request.getRequestId());
                            sendResponse(response);
                        }

// ...

NamingPushRequestHandler的處理邏輯:

public Response requestReply(Request request) {
    if (request instanceof NotifySubscriberRequest) {
        NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;
        serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());
        return new NotifySubscriberResponse();
    }
    return null;
}

serviceInfoHolder.processServiceInfo方法:

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) {
        return null;
    }
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    if (isEmptyOrErrorPush(serviceInfo)) {
        //empty or error push, just ignore
        return oldService;
    }
    serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    boolean changed = isChangedServiceInfo(oldService, serviceInfo);
    if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
        serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
    }
    MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
    if (changed) {
        // 推送一個InstancesChangeEvent事件
        NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}

推送一個InstancesChangeEvent事件:

  1. NotifyCenter維護著一個EventPublisher集,當有事件時,會選擇一個目標EventPublisher

  2. 通過publish方法將事件保存到一個Event隊列

    public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            // 當隊列操作失敗時,直接使用當前線程處理事件
            receiveEvent(event);
            return true;
        }
        return true;
    }
    
  3. EventPublisher是一個線程,在NotifyCenter初始化時啟動。run方法會從Event隊列取事件,使用receiveEvent(event)進行處理

  4. receiveEvent方法查找所有的Subscriber,其中就有最初創建的InstancesChangeNotifier,調用訂閱者onEvent方法

服務端

服務註冊

InstanceRequestHandler處理器

註冊中心的rpc處理器在com.alibaba.nacos.naming.remote.rpc.handler包,處理服務註冊和下線的處理器是InstanceRequestHandler類:

public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            // 服務註冊
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            // 服務下線
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }

    // 服務註冊
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

    // 服務下線
    private InstanceResponse deregisterInstance(
            Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
}

服務註冊核心流程

public void registerInstance(Service service, Instance instance, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // Add a new instance for service for current client
    // 1. 給當前客戶端綁定service -> instance關係
    // 2. 推送一個ClientChangedEvent事件
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();

    // 推送ClientRegisterServiceEvent和InstanceMetadataEvent事件
    NotifyCenter.publishEvent(
        new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter.publishEvent(
        new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  1. 給當前客戶端綁定service -> instance關係
  2. 推送一個ClientChangedEvent事件
  3. 推送ClientRegisterServiceEvent事件
  4. 推送InstanceMetadataEvent事件

事件處理流程

ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 會由DistroClientDataProcessor進行處理,同步客戶端數據到所有服務節點

ClientRegisterServiceEvent事件:Client register service event. 由ClientServiceIndexesManager進行處理,ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。另外該處理器會推送一個ServiceChangedEvent事件。

InstanceMetadataEvent事件:實例元數據事件。由NamingMetadataManager進行處理,NamingMetadataManager管理客戶端註冊的服務和實例元數據信息。InstanceMetadataEvent事件會觸發該處理器的實例過期判斷

ServiceChangedEvent事件:Service data changed event. 有兩個處理器:

  • NamingSubscriberServiceV2Impl - 觸發回調服務訂閱者任務
  • DoubleWriteEventListener - 觸發將服務信息同步到其他nacos節點任務

服務下線

服務下線核心流程

public void deregisterInstance(Service service, Instance instance, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    Client client = clientManager.getClient(clientId);
    // Remove service instance from client
    // 1. 解除當前客戶端的service -> instance關係
    // 2. 推送一個ClientChangedEvent事件
    InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);
    client.setLastUpdatedTime();

    // 推送ClientDeregisterServiceEvent和InstanceMetadataEvent事件
    if (null != removedInstance) {
        NotifyCenter.publishEvent(
            new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));
        NotifyCenter.publishEvent(
            new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));
    }
}
  1. 解除當前客戶端的service -> instance關係
  2. 推送一個ClientChangedEvent事件
  3. 推送ClientDeregisterServiceEvent事件
  4. 推送InstanceMetadataEvent事件

事件處理流程

基本與服務註冊流程相同。

ClientChangedEvent事件:Client changed event. Happened when Client add or remove service. 會由DistroClientDataProcessor進行處理,同步客戶端數據到所有服務節點

ClientDeregisterServiceEvent事件:Client deregister service event. 由ClientServiceIndexesManager進行處理,ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。另外該處理器會推送一個ServiceChangedEvent事件。

InstanceMetadataEvent事件:實例元數據事件。由NamingMetadataManager進行處理,NamingMetadataManager管理客戶端註冊的服務和實例元數據信息。InstanceMetadataEvent事件會觸發該處理器的實例過期判斷

ServiceChangedEvent事件:Service data changed event. 有兩個處理器:

  • NamingSubscriberServiceV2Impl - 觸發回調服務訂閱者任務
  • DoubleWriteEventListener - 觸發將服務信息同步到其他nacos節點任務

服務實例心跳

  1. 客戶端會周期性的發送healthCheck請求
  2. 服務端每次收到客戶端請求時都會更新對應connection的活躍時間戳
  3. 服務端也會周期性的檢查客戶端connection的活躍時間戳和客戶端IP連接數,當超過一定的時間不活躍,服務端會發一個檢測請求給客戶端,當連接數超過閾值時將重置多餘的連接

客戶端healthCheck請求

客戶端會周期性發送healthCheck請求,預設每5秒執行一次,在RpcClient中:

clientEventExecutor.submit(new Runnable() {
    @Override
    public void run() {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(keepAliveTime, TimeUnit.MILLISECONDS);
                if (reconnectContext == null) {
                    //check alive time.
                    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                        boolean isHealthy = healthCheck();
                        if (!isHealthy) {
                            if (currentConnection == null) {
                                continue;
                            }
                            
                            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                break;
                            }
// ...

healthCheck健康檢查:

private boolean healthCheck() {
    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
    try {
        Response response = this.currentConnection.request(healthCheckRequest, 3000L);
        return response != null && response.isSuccess();
    } catch (NacosException e) {
        //ignore
    }
    return false;
}

如果檢查失敗,將重新建立連接。

服務端記錄connection活躍時間戳

服務端每次收到客戶端請求時都會更新對應connection的活躍時間戳。

服務端使用GrpcRequestAcceptor作為業務層請求Acceptor入口,這個類會將GRPC的請求轉為業務層請求,並轉發到對應的RequestHandler處理器。

在其request方法中,會刷新對應connection的活躍時間戳:

Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
// 刷新connection的活躍時間戳
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();

服務端connection活躍檢查

服務端周期性檢查客戶端connection的活躍時間戳和客戶端IP連接數,當超過一定的時間不活躍,服務端會發一個檢測請求給客戶端,當連接數超過閾值時將重置多餘的連接。

服務端使用ConnectionManager管理連接:

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

在啟動時,會創建周期性任務檢查connections的活躍狀態,預設每3秒執行一次,以下為代碼片段:

// 檢查長時間不活躍的連接和超過最大連接數的連接
for (Map.Entry<String, Connection> entry : entries) {
    Connection client = entry.getValue();
    String clientIp = client.getMetaInfo().getClientIp();
    AtomicInteger integer = expelForIp.get(clientIp);
    if (integer != null && integer.intValue() > 0) {
        integer.decrementAndGet();
        expelClient.add(client.getMetaInfo().getConnectionId());
        expelCount--;
    } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
        outDatedConnections.add(client.getMetaInfo().getConnectionId());
    }
}

// ...

// 重置超過最大連接數的連接
for (String expelledClientId : expelClient) {
    try {
        Connection connection = getConnection(expelledClientId);
        if (connection != null) {
            ConnectResetRequest connectResetRequest = new ConnectResetRequest();
            connectResetRequest.setServerIp(serverIp);
            connectResetRequest.setServerPort(serverPort);
            connection.asyncRequest(connectResetRequest, null);
        }
    } catch (ConnectionAlreadyClosedException e) {
        unregister(expelledClientId);
    } catch (Exception e) {

    }
}

// ...

if (CollectionUtils.isNotEmpty(outDatedConnections)) {
    Set<String> successConnections = new HashSet<>();
    final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
    for (String outDateConnectionId : outDatedConnections) {
        try {
            Connection connection = getConnection(outDateConnectionId);
            if (connection != null) {
                // 給客戶端發檢測請求
                ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                    @Override
                    public Executor getExecutor() {
                        return null;
                    }

                    @Override
                    public long getTimeout() {
                        return 1000L;
                    }

                    @Override
                    public void onResponse(Response response) {
                        latch.countDown();
                        if (response != null && response.isSuccess()) {
                            connection.freshActiveTime();
                            successConnections.add(outDateConnectionId);
                        }
                    }

                    @Override
                    public void onException(Throwable e) {
                        latch.countDown();
                    }
                });
            } else {
                latch.countDown();
            }

        } catch (ConnectionAlreadyClosedException e) {
            latch.countDown();
        } catch (Exception e) {
            latch.countDown();
        }
    }

    latch.await(3000L, TimeUnit.MILLISECONDS);

    // 移除失敗的已斷開連接
    for (String outDateConnectionId : outDatedConnections) {
        if (!successConnections.contains(outDateConnectionId)) {
            unregister(outDateConnectionId);
        }
    }
}

客戶端斷開連接

業務處理流程

GRPC連接層檢測到連接斷開之後,會觸發GrpcServer的transportTerminated事件:

public void transportTerminated(Attributes transportAttrs) {
    String connectionId = null;
    try {
        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
    } catch (Exception e) {
        // Ignore
    }
    if (StringUtils.isNotBlank(connectionId)) {
        // 使用ConnectionManager移除連接
        connectionManager.unregister(connectionId);
    }
}

ConnectionManager移除連接:

public synchronized void unregister(String connectionId) {
    // 從Connection集移除連接
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        // IP連接數--
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        // 通知ClientManager層斷開連接
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

ConnectionBasedClientManager的clientDisconnected方法:

public boolean clientDisconnected(String clientId) {
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    // 推送一個ClientDisconnectEvent事件
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

事件處理流程

ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.

  • ClientServiceIndexesManager - 維護註冊和訂閱關係
  • DistroClientDataProcessor - 同步客戶端數據到所有服務節點
  • NamingMetadataManager - 維護客戶端註冊的服務和實例元數據信息

查詢實例

ServiceQueryRequestHandler處理器

ServiceQueryRequestHandler類負責客戶端的服務實例查詢請求:

public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest, QueryServiceResponse> {
    
    private final ServiceStorage serviceStorage;
    
    private final NamingMetadataManager metadataManager;
    
    public ServiceQueryRequestHandler(ServiceStorage serviceStorage,
                                      NamingMetadataManager metadataManager) {
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
    }
    
    @Override
    @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
    public QueryServiceResponse handle(
           ServiceQueryRequest request, RequestMeta meta) throws NacosException {

        String namespaceId = request.getNamespace();
        String groupName = request.getGroupName();
        String serviceName = request.getServiceName();
        Service service = Service.newService(namespaceId, groupName, serviceName);
        String cluster = null == request.getCluster() ? "" : request.getCluster();
        boolean healthyOnly = request.isHealthyOnly();

        // ServiceInfo封裝服務基本信息和其實例集合
        ServiceInfo result = serviceStorage.getData(service);
        ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
        result = ServiceUtil
            .selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);

        return QueryServiceResponse.buildSuccessResponse(result);
    }
}

查詢服務實例:

public ServiceInfo getData(Service service) {
    // 如果緩存裡面有服務信息則直接從緩存查找
    return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
    ServiceInfo result = emptyServiceInfo(service);
    if (!ServiceManager.getInstance().containSingleton(service)) {
        return result;
    }
    // 從ClientServiceIndexesManager查找
    result.setHosts(getAllInstancesFromIndex(service));
    serviceDataIndexes.put(service, result);
    return result;
}

private List<Instance> getAllInstancesFromIndex(Service service) {
    Set<Instance> result = new HashSet<>();
    Set<String> clusters = new HashSet<>();
    // 從ClientServiceIndexesManager查找service綁定的client集
    for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
        // 查找該client註冊的實例信息
        Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
        if (instancePublishInfo.isPresent()) {
            Instance instance = parseInstance(service, instancePublishInfo.get());
            result.add(instance);
            clusters.add(instance.getClusterName());
        }
    }
    // cache clusters of this service
    serviceClusterIndex.put(service, clusters);
    return new LinkedList<>(result);
}

private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
    // 獲取到client對象
    Client client = clientManager.getClient(clientId);
    if (null == client) {
        return Optional.empty();
    }
    // 查找該client指定service註冊的實例信息
    // AbstractClient使用Map<Service, InstancePublishInfo>結構保存
    // 前文介紹過在服務註冊時會使用client.addServiceInstance方法添加註冊信息
    return Optional.ofNullable(client.getInstancePublishInfo(service));
}

前文介紹過ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係。

服務訂閱

SubscribeServiceRequestHandler處理器

SubscribeServiceRequestHandler類負責客戶端的服務訂閱請求:

public class SubscribeServiceRequestHandler extends 
             RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {

    private final ServiceStorage serviceStorage;

    private final NamingMetadataManager metadataManager;

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public SubscribeServiceRequestHandler(ServiceStorage serviceStorage,
            NamingMetadataManager metadataManager,
            EphemeralClientOperationServiceImpl clientOperationService) {
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
        this.clientOperationService = clientOperationService;
    }

    @Secured(action = ActionTypes.READ, parser = NamingResourceParser.class)
    public SubscribeServiceResponse handle(
           SubscribeServiceRequest request, RequestMeta meta) throws NacosException {

        String namespaceId = request.getNamespace();
        String serviceName = request.getServiceName();
        String groupName = request.getGroupName();
        String app = request.getHeader("app", "unknown");
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        Service service = Service.newService(namespaceId, groupName, serviceName, true);

        // 封裝Subscriber對象:客戶端IP、版本、命名空間等
        Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app,
                meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());

        ServiceInfo serviceInfo = handleClusterData(serviceStorage.getData(service),
                metadataManager.getServiceMetadata(service).orElse(null),
                subscriber);

        if (request.isSubscribe()) {
            // 服務訂閱
            clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
        } else {
            // 取消訂閱
            clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
        }
        return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
    }

    private ServiceInfo handleClusterData(
            ServiceInfo data, ServiceMetadata metadata, Subscriber subscriber) {
        return StringUtils.isBlank(subscriber.getCluster()) ? data
                : ServiceUtil.selectInstancesWithHealthyProtection(data, metadata, subscriber.getCluster());
    }
}

服務訂閱核心流程

public void subscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    // 為該client綁定service -> subscriber關係
    client.addServiceSubscriber(singleton, subscriber);
    client.setLastUpdatedTime();
    // 推送一個ClientSubscribeServiceEvent事件
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

事件處理流程

ClientSubscribeServiceEvent事件:Client subscribe service event. 由ClientServiceIndexesManager進行處理,ClientServiceIndexesManager類維護clientId與service的註冊關係和訂閱關係

private void addSubscriberIndexes(Service service, String clientId) {
    subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
    // Only first time add need notify event.
    if (subscriberIndexes.get(service).add(clientId)) {
        // 推送一個ServiceSubscribedEvent事件
        NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
    }
}

ServiceSubscribedEvent事件:Service is subscribed by one client event. NamingSubscriberServiceV2Impl進行處理。

public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        // If service changed, push to all subscribers.
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(
            service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
    } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
        // 觸發一次訂閱者回調,把被訂閱的服務的信息推送給訂閱者
        ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
        Service service = subscribedEvent.getService();
        delayTaskEngine.addTask(
            service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                subscribedEvent.getClientId()));
    }
}

取消服務訂閱

public void unsubscribeService(Service service, Subscriber subscriber, String clientId) {
    Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
    Client client = clientManager.getClient(clientId);
    client.removeServiceSubscriber(singleton);
    client.setLastUpdatedTime();
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));
}

推送一個ClientUnsubscribeServiceEvent事件,還是使用ClientServiceIndexesManager來處理,移除訂閱關係。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • # Linux 之 shell 編程學習筆記(並不完全正確,有誤請指正) ## 概念性知識點 ### 腳本概念 >**腳本(Script),是使用一種特定的描述性語言,依據一定的格式編寫的 可執行文件** ### 運行腳本要求 >**腳本須有 ==可執行== 許可權,即 ==x== 許可權** > >* ...
  • emm,又又遇到問題啦,現有業務系統應用上線存在視窗期,不能滿足正常任務迭代上線。在非視窗期上線容易導致資料庫、mq、jsf等線程中斷,進而導致需要手動修單問題。故而通過添加優雅停機功能進行優化,令其在上線前選擇優雅停機後,會優先斷掉新流量的涌入,並預留一定時間處理現存連接,最後完全下線,可有效擴大... ...
  • Lua程式設計第四版第一部分語言基礎自做練習題答案,帶:star:為重點。 ## 1.1 > 運行階乘的示例並觀察,如果輸入負數,程式會出現什麼問題?試著修改代碼來解決問題 輸入負數,程式會死迴圈,修改如下 ```lua -- 定義一個計算階乘的函數 function fact(n) if n 分別 ...
  • 日期處理相關內容之前`pandas基礎`系列中有一篇專門介紹過,本篇補充兩個常用的技巧。 # 1. 多列合併為日期 當收集來的數據中,年月日等信息分散在多個列時,往往需要先合併成日期類型,然後才能做分析處理。合併多列轉換為日期類型,可以直接用 `to_datetime`函數來處理: ```pytho ...
  • 自 2014 年發佈以來, JDK 8 一直都是相當熱門的 JDK 版本。其原因就是對底層數據結構、JVM 性能以及開發體驗做了重大升級,得到了開發人員的認可。但距離 JDK 8 發佈已經過去了 9 年,那麼這 9 年的時間,JDK 做了哪些升級?是否有新的重大特性值得我們嘗試?能否解決一些我們現在... ...
  • 隨著需求不斷迭代,業務系統的業務代碼突飛猛進,在你自豪於自己的代碼量產出很高時,有沒有回頭看看線上真正的客戶使用量又有多少呢? ...
  • ![](https://img2023.cnblogs.com/other/1218593/202308/1218593-20230814093834285-1226325272.png) Chat2DB 是一款有開源免費的多資料庫客戶端工具,支持windows、mac本地安裝,也支持伺服器端部署, ...
  • SpringSecurity組件可以為服務提供安全管理的能力,比如身份驗證、授權和針對常見攻擊的保護,是保護基於spring應用程式的事實上的標準; ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...