nacos註冊中心單節點ap架構源碼解析

来源:https://www.cnblogs.com/yuanbeier/archive/2022/12/31/17017564.html
-Advertisement-
Play Games

一、註冊流程 單nacos節點流程圖如下: 流程圖可以知,Nacos註冊流程包括客戶端的服務註冊、服務實例列表拉取、定時心跳任務;以及服務端的定時檢查服務實例任務、服務實例更新推送5個功能。 服務註冊:當客戶端啟動的時候會根據當前微服務的配置信息把微服務註冊到nacos服務端。 服務實例列表拉取:當 ...


一、註冊流程

單nacos節點流程圖如下:

流程圖可以知,Nacos註冊流程包括客戶端的服務註冊、服務實例列表拉取、定時心跳任務;以及服務端的定時檢查服務實例任務、服務實例更新推送5個功能。

服務註冊:當客戶端啟動的時候會根據當前微服務的配置信息把微服務註冊到nacos服務端。

服務實例列表拉取:當客戶端啟動的時候從nacos服務端獲取當前服務的名稱已經註冊的實例數據,並把這些實例數據緩存在客戶端的serviceInfoMap 對象中。

定時心跳任務:當客戶端向nacos服務註冊臨時實例對象的時候,會創建一個延期的任務去往服務端發送心跳信息。如果發送心跳信息成功,則又會創建一個延期任務往服務端註冊心跳信息,一直重覆該邏輯。nacos服務端接收到客戶端的心跳信息就是更新客戶端實例的最後心跳時間。該時間用來判斷實例是否健康和是否需要刪除。

定時檢查服務實例任務:nacos服務端在創建空服務對象的時候會通過線程池來定時執行檢查服務,其主要邏輯為判斷當前時間和最後心跳時間之差是否大於健康超時時間和刪除實例超時時間,如果大於,則更新實例的健康狀態和刪除當前實例。定時執行的規則為5秒之後執行檢查,並且每次執行完檢查之後,5秒之後再次執行檢查。

服務實例更新推送:當有客戶端更新實例對象時,服務端會先獲取該客戶端的服務名稱下所有已經註冊的客戶端實例,並會針每一個客戶端發送一個更新serviceinfo的udp消息,客戶端監聽收到nacos服務端發送的udp數據後進行本地緩存的更新。

二、客戶端

一、服務註冊

根據spring-cloud-starter-alibaba-nacos-discovery的spring.factories文件,找到服務註冊啟動配置類。

spring.factories文件內容為如下,

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
  com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

根據名稱判斷可以得出 NacosServiceRegistryAutoConfiguration 為服務註冊啟動配置類,源碼如下

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

關鍵類 NacosAutoServiceRegistration 的類圖結構如下

上圖可知,NacosAutoServiceRegistration 實現了 ApplicationListener介面,該監聽器會在SpringBoot啟動的時候會自動調用 onApplicationEvent方法,onApplicationEvent具體實現方法如下

public void onApplicationEvent(WebServerInitializedEvent event) {
    this.bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
        this.port.compareAndSet(0, event.getWebServer().getPort());
        // 具體的啟動方法
        this.start();
    }
}

具體的啟動方法this.start();方法的代碼如下,

public void start() {
    if (!this.isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }

    } else {
        if (!this.running.get()) {
            this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
            // 關鍵邏輯
            this.register();
            if (this.shouldRegisterManagement()) {
                this.registerManagement();
            }

            this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }

關鍵邏輯為this.register();方法代碼如下

protected void register() {
    if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
        log.debug("Registration disabled.");
        return;
    }
    if (this.registration.getPort() < 0) {
        this.registration.setPort(getPort().get());
    }
    super.register();
}

關鍵邏輯為super.register();方法代碼如下,

protected void register() {
    this.serviceRegistry.register(this.getRegistration());
}

關鍵邏輯為this.serviceRegistry.register方法代碼如下,

@Override
public void register(Registration registration) {

    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
	// 根據配置屬性構建NamingService對象
    NamingService namingService = namingService();
    // 獲取服務名,預設為 ${spring.application.name}
    String serviceId = registration.getServiceId();
    // 獲取組名 ,預設為 DEFAULT_GROUP
    String group = nacosDiscoveryProperties.getGroup();

    // 創建註冊實例
    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        // 發起註冊
        namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                 instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        log.error("nacos registry, {} register failed...{},", serviceId,
                  registration.toString(), e);
        // rethrow a RuntimeException if the registration is failed.
        // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
        rethrowRuntimeException(e);
    }
}

先通過getNacosInstanceFromRegistration方法創建實例對象,getNacosInstanceFromRegistration代碼如下,

private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();
    // 獲取服務ip
    instance.setIp(registration.getHost());
    // 獲取服務
    instance.setPort(registration.getPort());
    // 獲取權重
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    // 獲取集群名稱
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
  
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
    // 獲取元數據
    instance.setMetadata(registration.getMetadata());
    // 獲取是否為臨時實例
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}

然後通過namingService.registerInstance方法發起註冊,registerInstance方法的代碼如下,

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 檢查 實例是否合法 
    // heart beat timeout must(預設15秒) < heart beat interval (預設5秒)拋異常
    // ip delete timeout must(預設30 秒) < heart beat interval(預設5秒)拋異常
    NamingUtils.checkInstanceIsLegal(instance);
    // 構建 groupName@@serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果是臨時實例,則創建心跳信息,定時給nacos服務發送
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
        this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
	// 向 nacos-service 註冊實例
    this.serverProxy.registerService(groupedServiceName, groupName, instance);
}

先檢查實例是否合法,然後構建服務名稱,規則為groupName@@serviceName。通過this.serverProxy.registerService方法向 nacos-service 註冊實例,代碼如下,

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);

    final Map<String, String> params = new HashMap<String, String>(16);
    //設置 namespaceId
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    //設置 serviceName
    params.put(CommonParams.SERVICE_NAME, serviceName);
    //設置 groupName
    params.put(CommonParams.GROUP_NAME, groupName);
    //設置 clusterName
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
	// 調用 nacos-service 的nacosUrlInstance介面註冊實例
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}

通過向reqApi方法向nacos服務端註冊當前實例數據,其實就是向 ${spring.cloud.nacos.discovery.server-addr}/nacos/v1/ns/instance 發送POST請求。該請求地址對應的nacos服務端的源碼的naming工程中InstanceController的register方法,代碼如下,

public String register(HttpServletRequest request) throws Exception {
    final String namespaceId = WebUtils
        .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
	//根據請求構建 Instance 對象
    final Instance instance = parseInstance(request);
	//註冊 Instance 對象,serviceManager對象中保存了所有的服務對象。
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

先根據請求對象構建Instance對象,然後通過serviceManager.registerInstance方法用來註冊Instance對象,registerInstance代碼如下

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
	// 如果 namespaceId 為 key 的數據為空,則創建 service ,並初始化service
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
	// 獲取 service 對象
    Service service = getService(namespaceId, serviceName);
	// 如果 service為空 則報錯
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                                 "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
	// 添加實例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

如果 namespaceId為key的數據為空,則創建 service,並初始化service。然後調用addInstance添加實例對象,addInstance方法代碼如下,

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    throws NacosException {
	  // 根據 命名空間 和 服務名稱 構建 key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 獲取 service
        Service service = getService(namespaceId, serviceName);
        // 同步鎖
        synchronized (service) {
            // 獲取服務下的實例集合(服務已有 + 新增的實例)
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 根據KEY添加服務的實例
            consistencyService.put(key, instances);
        }
}

addIpAddresses方法中會調用updateIpAddresses方法,且action為 add。該方法根據action的值來獲取該服務下的最新實例集合(新增實例或刪除實例加上目前服務已有的實例數據合集)。如果action為add表示新增,則方法最後返回的集合對象中會把該服務中已有的實例集合加上新增的實例集合數據一起返回 ;如果action為 remove表示刪除,則方法最後返回的集合對象中會把該服務中已有的實例集合刪除掉需要刪除的實例集合數據。後面通過調用consistencyService.put(key, instances)方法來把updateIpAddresses方法返回的值直接添加consistencyService的實例中。updateIpAddresses方法的代碼如下,

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
    throws NacosException {
    // 從本地緩存中獲取服務的實例數據
    Datum datum = consistencyService
        .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    // 獲取 當前服務下所有的 實例
    List<Instance> currentIPs = service.allIPs(ephemeral);
    // 創建當前實例數據map
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    // 創建 當前實例Id set
    Set<String> currentInstanceIds = Sets.newHashSet();

    // 遍歷當前服務的所有實例,添加到 創建當前實例數據 map 和 當前實例Id集合
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }
    // 構造 實例集合對象的 map
    Map<String, Instance> instanceMap;
    // 如果有緩存數據
    if (datum != null && null != datum.value) {
        // 從本地緩存中以及當前服務的記憶體數據獲取最新服務的實例數據
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    }
    // 如果沒有緩存數據
    else {
        // 創建 instanceMap
        instanceMap = new HashMap<>(ips.length);
    }
    // 遍歷參數傳過來的實例對象
    for (Instance instance : ips) {
        // 如果 service 不包括 實例的 ClusterName 則創建 實例 Cluster,並初始化
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                      instance.getClusterName(), instance.toJson());
        }
        // 如果是刪除,則從 instanceMap 中 刪除 該實例
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        }
        // 如果是新增
        else {
            //獲取已存在的 實例
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                // 生成 實例 id
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            // instanceMap 添加instance實例
            instanceMap.put(instance.getDatumKey(), instance);
        }

    }
    // 如果集合小於0 ,並且是新增操作則拋異常
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
            "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
            .toJson(instanceMap.values()));
    }
    // 返回 服務中最新的實例數據
    return new CopyOnWriteArrayList<>(instanceMap.values());
}

通過updateIpAddresses方法拿到需要更新的實例集合對象後,再通過consistencyService.put(key, instances)把拿到的實例集合對象添加到實現了PersistentConsistencyServiceDelegateImpl或者EphemeralConsistencyService介面的實例對象中,consistencyService.put(key, instances)的源碼如下,

@Override
public void put(String key, Record value) throws NacosException {
    // 根據key獲取具體的 consistencyService ,並且向其中添加具體的 key 和 value
    mapConsistencyService(key).put(key, value);
}

根據key獲取具體的 consistencyService ,並且向其中添加具體的 key 和 value。consistencyService中根據key獲取集群的實例對象(臨時服務對象EphemeralConsistencyService和持久服務對象PersistentConsistencyServiceDelegateImpl)

private ConsistencyService mapConsistencyService(String key) {
    // 根據key返回具體的服務對象
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

如果是註冊的臨時實例節點,這裡取到的是實現了ephemeralConsistencyService介面的DistroConsistencyServiceImpl 對象,它的put源碼如下:

@Override
public void put(String key, Record value) throws NacosException {
    // 添加key 和 value
    onPut(key, value);
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                        globalConfig.getTaskDispatchPeriod() / 2);
}

通過onPut方法添加key 和 value,opPut方法的代碼如下,

public void onPut(String key, Record value) {
    // 如果是臨時節點實例,則創建 Datum 並保存在 dataStore 中
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    // 如果 監聽對象不包括 key 則返回
    if (!listeners.containsKey(key)) {
        return;
    }
    // 向notifier對象添加通知任務
    notifier.addTask(key, DataOperation.CHANGE);
}

如果是臨時實例節點,則創建 Datum 並保存在 dataStore 中,然後通過notifier.addTask用來向notifier對象添加通知任務,且操作類型為DataOperation.CHANGE,addTask方法的代碼如下:

public void addTask(String datumKey, DataOperation action) {
    // 如果services包括了當前的 datumKey ,並且是修改操作 則直接返回
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    // 如果是修改操作,則向 services 添加 datumKey
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    // 向 tasks中添加 Pair 對象
    tasks.offer(Pair.with(datumKey, action));
}

以上代碼的tasks是用來存放具體實例key和動作類型的對象,它是一個ArrayBlockingQueue對象,DistroConsistencyServiceImpl 對象的init方法代碼如下,

@PostConstruct
public void init() {
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

根據以上代碼可知,在DistroConsistencyServiceImpl 實例對象初始化之後會往GlobalExecutor線程池對象中添加了一個notifier對象。notifier對象為一個實現了Runnable 的實例。上面的代碼會執行notifier對象的run方法,notifier的run方法代碼如下:

public void run() {
    Loggers.DISTRO.info("distro notifier started");
    // 死迴圈遍歷
    for (; ; ) {
        try {
            // 獲取 tasks的數據,如果沒有數據會阻塞當前線程,直到tasks有數據為止。
            Pair<String, DataOperation> pair = tasks.take();
            // 處理數據
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

上面是一個死迴圈,tasks.take()是一個阻塞式獲取數據的方法,如果tasks沒有數據則會阻塞當前線程直到tasks.take()拿到數據,拿到數據之後會調用handle方法處理,handle代碼如下,

private void handle(Pair<String, DataOperation> pair) {
    try {
        String datumKey = pair.getValue0();
        DataOperation action = pair.getValue1();
        // 先從 services 中刪除 key
        services.remove(datumKey);

        int count = 0;
        // 根據 key 獲取 服務對象數據
        ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey);
        if (recordListeners == null) {
            Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);
            return;
        }
        for (RecordListener listener : recordListeners) {
            count++;
            try {
                // 如果是新增
                if (action == DataOperation.CHANGE) {
                    Datum datum = dataStore.get(datumKey);
                    if (datum != null) {
                        // 更新 serivce 的實例數據
                        listener.onChange(datumKey, datum.value);
                    } else {
                        Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);
                    }
                    continue;
                }
                // 如果是刪除
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }

        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                       datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

根據action 為 DataOperation.CHANGE,代碼中執行的代碼分支為listener.onChange(datumKey, datum.value),該方法的邏輯為修改服務的實例數據,源碼如下

public void onChange(String key, Instances value) throws Exception {

    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

    for (Instance instance : value.getInstanceList()) {

        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }

        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }

        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    // 更新 service 的 實例集合
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

    recalculateChecksum();
}

以上代碼先遍歷所有的實例數據設置權值,再通過updateIPs方法更新服務實例,updateIPs方法的代碼如下:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    // 根據 clusterMap 創建 ipMap對象
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    // 根據 clusterMap 初始化 ipMap對象
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
	// 遍歷最新的實例集合數據
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            // 如果集群名稱為null ,則設置預設的集群名稱 DEFAULT
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            // 如果當前 service 的clusterMap不包括 實例的 集群名稱,則需要創建新的集群對象
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                          instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }

            // 如果當前 ipMap 不包括 當前實例的 集群名稱,則需要創建新的集群對象
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
			// 給當前的 集群對象賦值 實例數據。
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }
	// 遍歷 ipMap對象,給 clusterMap 替換最新的 entryIPs
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // 給 clusterMap 替換最新的 entryIPs
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }

    setLastModifiedMillis(System.currentTimeMillis());
    // 發佈
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();

    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }

    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                         stringBuilder.toString());

}

以上代碼先根據當前服務下的集群信息構造構造ipMap對象,然後遍歷最新的實例集合數據更新ipMap對象,最後迴圈調用clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral)方法來更新當前集群中的實例列表數據。updateIps方法代碼如下:

public void updateIps(List<Instance> ips, boolean ephemeral) {
    // 獲取 本集群中的 實例集合
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    // 根據old的實例數據 構建 hashmap
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    // 根據實例的 key 添加到 oldIpMap中
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    // 獲取更新的 實例數據 List
    List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
    if (updatedIPs.size() > 0) {
        for (Instance ip : updatedIPs) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());

            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                ip.setHealthy(oldIP.isHealthy());
            }
            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                                     (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }

            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                                     ip.toString());
            }
        }
    }
    // 獲取新增的 實例數據
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                  getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
            HealthCheckStatus.reset(ip);
        }
    }
    // 獲取刪除的 實例數據
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                  getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
            HealthCheckStatus.remv(ip);
        }
    }
    // 根據傳進來的 實例集合 創建需要更新的實例set 
    toUpdateInstances = new HashSet<>(ips);

    // 直接替換
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

以上代碼就是更新cluster對象下的實例數據邏輯,根據代碼可知在cluster對象中更新實例數據就是拿傳進來的實例列表創建set集合直接替換的。

二、服務實例列表拉取

客戶端程式啟動之後,會執行com.alibaba.cloud.nacos.discovery.NacosWatch類的start()方法,此方法中會執行以下語句,

namingService.subscribe(properties.getService(), properties.getGroup(),
                        Arrays.asList(properties.getClusterName()), eventListener);

此方法用來獲取當前服務的實例數據,subscribe方法代碼如下,

public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
    throws NacosException {
    // 獲取服務列表數據
    hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
                          listener);
}

通過hostReactor.subscribe方法獲取服務列表數據,subscribe方法的代碼如下,

public void subscribe(String serviceName, String clusters, EventListener eventListener) {
    notifier.registerListener(serviceName, clusters, eventListener);
    // 獲取服務列表數據
    getServiceInfo(serviceName, clusters);
}

通過getServiceInfo方法獲取服務列表數據,getServiceInfo的代碼如下:

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
    return failoverReactor.getService(key);
}
// 根據服務名稱和集群名稱獲取本地的服務列表數據
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
    serviceObj = new ServiceInfo(serviceName, clusters);
    serviceInfoMap.put(serviceObj.getKey(), serviceObj);
    updatingMap.put(serviceName, new Object());
    // 如果本地服務實例數據為null,則去獲取最新的服務實例列表
    updateServiceNow(serviceName, clusters);
    updatingMap.remove(serviceName);

} else if (updatingMap.containsKey(serviceName)) {
    if (UPDATE_HOLD_INTERVAL > 0) {
        // hold a moment waiting for update finish
        synchronized (serviceObj) {
            try {
                serviceObj.wait(UPDATE_HOLD_INTERVAL);
            } catch (InterruptedException e) {
                NAMING_LOGGER
                    .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
            }
        }
    }
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());

以上代碼可知,會根據服務名稱和clusters名稱獲取本地緩存serviceInfoMap對象中的服務列表數據。如果本地服務實例數據為null,則通過updateServiceNow方法去nacos服務端獲取最新的服務實例列表。updateServiceNow方法代碼如下:

try {
    // 更新本地服務方法
    updateService(serviceName, clusters);
} catch (NacosException e) {
    NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}

updateService的代碼如下:

public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
		// 調用服務代理類獲取服務實例列表,pushReceiver.getUdpPort()會隨機生成一個udp埠
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        if (StringUtils.isNotEmpty(result)) {
            // 如果 result不為空,則向本地緩存 serviceInfoMap 添加服務實例列表
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

通過調用服務代理類serverProxy的queryList方法獲取服務實例列表,pushReceiver.getUdpPort()會獲pushReceiver的udp埠,pushReceiver對象是一個udp數據接收類,用來接收nacos伺服器發送的udp數據,比如服務實例更新的消息。serverProxy.query方法的代碼如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException {
	// 構造請求參數
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    // 客戶端的upd埠,服務端回調客戶端udp埠會用到
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
	// 向nacos伺服器獲取服務列表數據,並返回
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

在構造的請求參數中包括了客戶端的udpPort,該參數在服務端回調介面會用到。reqApi方法其實就向nacos伺服器的/nacos/v1/ns/instance/list介面發送了請求消息,該介面對應的nacos服務端的源碼的naming工程中InstanceController的list方法,代碼如下,

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {

    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    // 獲取實例列表數據
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                     healthyOnly);
}

以上代碼先構造相關參數信息,然後通過doSrvIpxt方法來獲取實例列表數據,doSrvIpxt代碼如下:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
                            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 根據命名空間id和服務名稱獲取服務
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();

    // now try to enable the push
    try {
        // 如果埠大於0 ,並且是支持的客戶端
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
            // 添加 PushClient 對象
            pushService
                .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        Loggers.SRV_LOG
            .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }
    // 如果服務對象為 null ,則構造數據返回
    if (service == null) {
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.put("cacheMillis", cacheMillis);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }
    // 檢查服務是否可用
    checkIfDisabled(service);

    List<Instance> srvedIPs;
    // 根據集群列表獲取具體服務下麵的實例列表
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

    // filter ips using selector:
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    // 如果實例數據為空,則構造數據返回
    if (CollectionUtils.isEmpty(srvedIPs)) {

        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }

        if (clientInfo.type == ClientInfo.ClientType.JAVA
            && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }

        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.set("hosts", JacksonUtils.createEmptyArrayNode());
        result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }

    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    // 構造健康和不健康的實例數據
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }

    if (isCheck) {
        result.put("reachProtectThreshold", false);
    }

    double threshold = service.getProtectThreshold();

    if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

        Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
        if (isCheck) {
            result.put("reachProtectThreshold", true);
        }

        ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
        ipMap.get(Boolean.FALSE).clear();
    }

    if (isCheck) {
        result.put("protectThreshold", service.getProtectThreshold());
        result.put("reachLocalSiteCallThreshold", false);

        return JacksonUtils.createEmptyJsonNode();
    }

    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    // 構造返回的實例列表對象
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();

        if (healthyOnly && !entry.getKey()) {
            continue;
        }

        for (Instance instance : ips) {

            // remove disabled instance:
            if (!instance.isEnabled()) {
                continue;
            }

            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            // deprecated since nacos 1.0.0:
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }

            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);

        }
    }

    result.replace("hosts", hosts);
    if (clientInfo.type == ClientInfo.ClientType.JAVA
        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
        result.put("dom", serviceName);
    } else {
        result.put("dom", NamingUtils.getServiceName(serviceName));
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}

以上代碼其實就是根據命名空間id和服務名稱獲取服務對象,然後根據不同情況構造返回對象,正常情況會構造一個ServiceInfo類型的ObjectNode對象,整個具體過程請看上面的代碼註釋。最後返回構造的對象。

客戶端中拿到請求/nacos/v1/ns/instance/list介面的返回值之後會轉成一個ServiceInfo對象,並且把該對象賦值給本地的緩存對象serviceInfoMap,processServiceJson關鍵代碼如下:

// 將返回值轉換成 ServiceInfo 類型的對象
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
// 把該對象添加到本地緩存中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

三、定時心跳任務

在客戶端向nacos服務端註冊服務的過程中,會調用com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)方法,在該代碼中有個判斷邏輯,如果是臨時實例則會創建一個BeatInfo對象添加到beatReactor中。代碼如下:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 檢查 實例是否合法
    // heart beat timeout must(預設15秒) > heart beat interval (預設5秒)
    // ip delete timeout must(預設30 秒)  > heart beat interval
    NamingUtils.checkInstanceIsLegal(instance);
    // 構建 groupName@@serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 如果是臨時實例,則創建心跳信息,定時給nacos服務發送
    if (instance.isEphemeral()) {
        // 構造心跳信息
        BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
        // 執行心跳定時任務
        this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
	// 向 nacos-service 註冊實例
    this.serverProxy.registerService(groupedServiceName, groupName, instance);
}

beatInfo對象用來存儲心跳信息,buildBeatInfo方法代碼如下,

public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
    BeatInfo beatInfo = new BeatInfo();
    beatInfo.setServiceName(groupedServiceName);
    beatInfo.setIp(instance.getIp());
    beatInfo.setPort(instance.getPort());
    beatInfo.setCluster(instance.getClusterName());
    beatInfo.setWeight(instance.getWeight());
    beatInfo.setMetadata(instance.getMetadata());
    beatInfo.setScheduled(false);
    beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    return beatInfo;
}

beatReactor中有一個ScheduledExecutorService類型的executorService實例用來執行定時的線程,addBeatInfo的代碼如下,

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    // 線程池添加定時任務,預設 5 秒鐘之後 執行 BeatTask
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

根據上面的executorService.schedule()代碼可知,BeatTask線程在固定的秒數之後執行,而BeatTask實現了Runnable介面,即執行BeatTask的run方法 。BeatTask的run方法代碼如下,

public void run() {
    // 如果 beatInfo 設置了 stop ,則停止
    if (beatInfo.isStopped()) {
        return;
    }
    // 獲取下一次延期執行的時間
    long nextTime = beatInfo.getPeriod();
    try {
        // 向服務端發送心跳信息
        JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
        long interval = result.get("clientBeatInterval").asLong();
        boolean lightBeatEnabled = false;
        if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
            lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
        }
        BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
        if (interval > 0) {
            nextTime = interval;
        }
        int code = NamingResponseCode.OK;
        if (result.has(CommonParams.CODE)) {
            code = result.get(CommonParams.CODE).asInt();
        }
        if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
            Instance instance = new Instance();
            instance.setPort(beatInfo.getPort());
            instance.setIp(beatInfo.getIp());
            instance.setWeight(beatInfo.getWeight());
            instance.setMetadata(beatInfo.getMetadata());
            instance.setClusterName(beatInfo.getCluster());
            instance.setServiceName(beatInfo.getServiceName());
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(true);
            try {
                serverProxy.registerService(beatInfo.getServiceName(),
                                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
            } catch (Exception ignore) {
            }
        }
    } catch (NacosException ex) {
        NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                            JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

    }
    // 重新提交定時任務,延期發送心跳信息
    executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}

以上代碼先獲取下一次延期執行的時間,再通過serverProxy.sendBeat()向服務端發送心跳信息,最後重新提交定時任務,延期發送心跳信息,serverProxy.sendBeat()代碼如下,

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    // 向nacos伺服器發送心跳數據,並返回
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

reqApi方法其實就向nacos伺服器端的/nacos/v1/ns/instance/beat介面發送了put類型的請求消息,該介面對應的nacos服務端的源碼的naming工程中InstanceController的beat方法,beat方法的代碼如下,

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    // 獲取心跳信息
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    // 如果 beat 數據不為空,則構造 RsInfo 類型的  clientBeat 實例
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    // 獲取集群名稱
    String clusterName = WebUtils
        .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    // 獲取 實例的 ip
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    // 獲取 實例的 埠
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    // 如果 clientBeat 不為空,則設置 相關的信息
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    // 獲取 namespaceId
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 獲取 serviceName
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    // 檢查 ServiceName 的格式
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    // 根據 參數 獲取 具體的實例
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 如果 實例為 空
    if (instance == null) {
        // 如果 clientBeat 為空 則構造參數 code 為 20404的結果返回
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                             + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        // 如果 clientBeat 不為空 則構造 instance 數據,向 serviceManager 註冊實例。
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    // 根據服務名稱獲取 服務
    Service service = serviceManager.getService(namespaceId, serviceName);
    // 如果服務為空 ,則拋異常
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                                 "service not found: " + serviceName + "@" + namespaceId);
    }
    // 如果 clientBeat 為空,則創建該對象
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    // 處理客戶端的 心跳對象
    service.processClientBeat(clientBeat);
    //
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

先獲取心跳信息,然後構造RsInfo類型的clientBeat實例。然後通過service.processClientBeat(clientBeat)方法處理客戶端的心跳對象,processClientBeat方法的代碼如下,

public void processClientBeat(final RsInfo rsInfo) {
    // 構造 ClientBeatProcessor 對象
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    // 定時執行 ClientBeatProcessor 對象,這裡是立即執行,延期時間為 0
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

ClientBeatProcessor是一個實現了Runnable的類,HealthCheckReactor是一個定時任務線程池,scheduleNow方法表示立即執行clientBeatProcessor對象的run方法,clientBeatProcessor.run方法代碼如下,

public void run() {
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) {
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
    }
    // 獲取ip
    String ip = rsInfo.getIp();
    // 獲取 集群名稱
    String clusterName = rsInfo.getCluster();
    // 獲取埠
    int port = rsInfo.getPort();
    // 從服務對象中獲取集群對象
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 從集群對象中獲取所有的臨時實例列表
    List<Instance> instances = cluster.allIPs(true);

    for (Instance instance : instances) {
        // 找到 ip 和埠相同的 實例數據
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
            }
            // 更新 最後心跳時間
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked() && !instance.isHealthy()) {
                instance.setHealthy(true);
                Loggers.EVT_LOG
                    .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                          cluster.getService().getName(), ip, port, cluster.getName(),
                          UtilsAndCommons.LOCALHOST_SITE);
                getPushService().serviceChanged(service);
            }
        }
    }
}

以上代碼可知,該方法主要用來更新客戶端實例的最後心跳時間。

三、服務端介面

一、定時檢查服務實例任務

在客戶端註冊服務的時候,會調用nacos服務端的com.alibaba.nacos.naming.controllers.InstanceController#register方法,其中會調用createEmptyService方法用來創建空的服務對象,最後會調用service.init()方法用來初始化服務對象,init方法代碼如下

public void init() {
    // 定時執行 service 的 run 方法 處理超時的 instance
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

通過調用HealthCheckReactor.scheduleCheck()方法來定時執行clientBeatCheckTask,scheduleCheck的代碼如下,

public static void scheduleCheck(ClientBeatCheckTask task) {
    // 5秒之後執行 task,並且每次執行task完之後,5秒之後再次執行 task
    futureMap.computeIfAbsent(task.taskKey(),
                              k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

以上代碼給定時任務線程池GlobalExecutor提交了一個task任務,其中task是一個實現了Runable介面的類,線程池每次執行的就是ClientBeatCheckTask 的run方法,run方法代碼如下,

public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        // 獲取該服務下麵的所有 註冊實例集合
        List<Instance> instances = service.allIPs(true);
        // first set health status of instances:
        for (Instance instance : instances) {
            // 如果 當前時間 減去 實例的最新心跳時間 如果大於 實例配置的心跳超時時間(預設15秒)
            // 並且 實例的健康狀態 true
            // 則設置服務的健康狀態為 false
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                            .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                  instance.getIp(), instance.getPort(), instance.getClusterName(),
                                  service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                  instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        // then remove obsolete instances:
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }
            // 如果 當前時間 減去 實例的最新心跳時間 如果大於 實例配置的刪除超時時間(預設30秒)
            // 則會調用 deleteIp 刪除方法刪除實例
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                                     JacksonUtils.toJson(instance));
                // 刪除實例
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}

以上代碼就兩個邏輯,一個邏輯是判斷當前時間減去實例的最新心跳時間是否大於實例配置的心跳超時時間(預設15秒),如果大於則設置實例的健康狀態為false;第二個邏輯是 判斷當前時間減去實例的最新心跳時間 是否大於實例配置的刪除超時時間(預設30秒),如果大於則調用deleteIp(instance);刪除該實例,deleteIp的代碼如下,

NamingProxy.

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一:背景 1. 講故事 相信有很多朋友在學習 SQLSERVER 的時候都聽說過這句話,但大多都是記憶為主,最近在研究 SQLSERVER,所以我們從 底層存儲 的角度來深入理解下。 二:理解數據頁 1. 數據頁的組織 在前面的文章中我也說過,一個 數據頁 是 8k 大小,那這 8k 是如何組織的呢 ...
  • ​ 1、需求描述 最近碰到了一個需求,是要統計各個團隊的員工的銷售金額,然後一級一級向上彙總。 ​編輯 架構團隊樹是類似於這種樣子的,需要先算出每個員工的銷售金額,然後彙總成上一級的團隊金額,然後各個團隊的銷售總金額再往上彙總成一個區域的銷售金額,然後各個區域的金額再往上彙總成總公司的金額。當然我工 ...
  • 1 mysql 報錯解決mysql> grant all on *.* to "dba"@"%" identified by "mysql123";ERROR 1819 (HY000): Your password does not satisfy the current policy requir ...
  • 開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第3天,點擊查看活動詳情 如果你正需要處理Flutter異常捕獲,那麼恭喜你,找對地了,這裡從根源上給你準備了Flutter異常捕獲需要是所有知識和原理,讓你更深刻認識Flutter Zone概念。 Zone是什麼 /// A zo ...
  • Vue3,webpack,vite 通用 適用於中大型項目中 1.安裝vuex npm i vuex 2.創建倉庫與文件結構(核心) 一,創建入口 在src目錄下創建store文件夾,store文件夾下創建 下麵文件結構 actions.js import * as type from './mut ...
  • 基於jQuery的三種AJAX請求 1. 介紹 get請求 通常用於 獲取服務端資源(向伺服器要資源) ​ 例如:根據URL地址,從伺服器獲取HTML文件、CSS文件、JS文件、圖片文件、數據資源等。 post請求 通常用於 向伺服器提交數據(往伺服器發送資源) ​ 例如:登錄時向伺服器提交的登錄信 ...
  • 1776 年亞當斯密發表《國富論》,標志著經濟學的誕生。2004 年,一本名為《領域驅動設計·軟體核心複雜性應對之道》的書問世,開闢了軟體開發的一個新流派:領域驅動設計。看完這本書,十個人有九個人的感覺都是:似懂非懂,若有所得,掩卷長思,一無所得,我個人的感覺同樣如此。出於興趣,多年來仔細研讀了幾十... ...
  • HttpClient 版本已經到 5.2.1 了. 在版本4中的一些方法已經變成 deprecated, 於是將之前的工具類升級一下, 順便把中間遇到的問題記錄一下 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...