核心功能點 【1】服務註冊:Nacos Client會通過發送REST請求的方式向Nacos Server註冊自己的服務,提供自身的元數據,比如ip地址、埠等信息。Nacos Server接收到註冊請求後,就會把這些元數據信息存儲在一個雙層的記憶體Map中。 【2】服務心跳:在服務註冊後,Nacos ...
核心功能點
【1】服務註冊:Nacos Client會通過發送REST請求的方式向Nacos Server註冊自己的服務,提供自身的元數據,比如ip地址、埠等信息。Nacos Server接收到註冊請求後,就會把這些元數據信息存儲在一個雙層的記憶體Map中。
【2】服務心跳:在服務註冊後,Nacos Client會維護一個定時心跳來持續通知Nacos Server,說明服務一直處於可用狀態,防止被剔除。預設5s發送一次心跳。
【3】服務同步:Nacos Server集群之間會互相同步服務實例,用來保證服務信息的一致性。
【4】服務發現:服務消費者(Nacos Client)在調用服務提供者的服務時,會發送一個REST請求給Nacos Server,獲取上面註冊的服務清單,並且緩存在Nacos Client本地,同時會在Nacos Client本地開啟一個定時任務定時拉取服務端最新的註冊表信息更新到本地緩存
【5】服務健康檢查:Nacos Server會開啟一個定時任務用來檢查註冊服務實例的健康情況,對於超過15s沒有收到客戶端心跳的實例會將它的healthy屬性置為false(客戶端服務發現時不會發現),如果某個實例超過30秒沒有收到心跳,直接剔除該實例(被剔除的實例如果恢複發送心跳則會重新註冊)
源碼精髓總結
【1】註冊表的結構說明(這個僅是記錄):
//Map<namespaceId, Map<service_name, Service>【ConcurrentSkipListMap】> private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); //再分析裡面的Service,Map<clusterName, Cluster> private Map<String, Cluster> clusterMap = new HashMap<>(); //再分析Cluster private Set<Instance> persistentInstances = new HashSet<>(); private Set<Instance> ephemeralInstances = new HashSet<>();
【2】分析註冊表為何要這麼設計
1.註冊表是基於第一層ConcurrentHashMap,第二層ConcurrentSkipListMap,第三層HashMap,然後定位到對應的Cluster。 2.至於為什麼要這樣設計,一方面是將粒度劃分的更細,通過源碼分析可知,nacos更新註冊表是進行小範圍的更新,如定位到Cluster的臨時列表ephemeralInstances或者持久列表persistentInstances【這兩個都是set集合,所以排除了會有重覆的數據】。因為粒度小所以更新速度會更快。 3.其次採用的是 寫時複製思想,也就是說,不會影響讀取的效率,因為是新開一個副本,將新舊的數據合併到一個新數據裡面,然後將引用指向新數據。 4.其次是為了高擴展,對namespace進行劃分【對開發環境隔離】,對service進行劃分【對服務進行隔離】,對Cluster進行劃分【多機房部署,加快訪問速度】 5.為瞭解決併發讀寫問題,採用的是ConcurrentHashMap與ConcurrentSkipListMap的分段鎖,加上Cluster裡面的寫時複製。其次Cluster裡面是不加鎖的,因為是單線程進行修改,不存在衝突。 6.雖說犧牲了,一定的實時性,但是大大提高了併發的性能。
【3】分析AP架構下為什麼高性能的原因
1.因為採用的是非同步任務加隊列的形式來實現註冊的,所以響應很快,然後任務是慢慢做的。 2.Notifier 是在DistroConsistencyServiceImpl類中初始化,預設單線程,而且隊列為ArrayBlockingQueue<>(1024 * 1024)。 3.縮小了變更數據的粒度,單線程避免了線程安全問題【不用加鎖】。 4.這種方式毫無疑問是會存在問題的,就是響應了但是沒有註冊上。但是對於這個問題,在客戶端裡面做了心跳機制,如果檢測不到會重新註冊。
【4】分析Nacos為什麼感知快的原因
採用的是客戶端定時進行一次拉取,兼服務端採用非同步的形式使用UDP發送更新的數據到客戶端;
雖然UDP存在通知丟失的情況,但是每隔1s的拉取依舊能很好的保持數據的最終一致性。
源碼分析
驗證服務端
【1】在啟動的時候我們一般是調用shell腳本啟動,查看startup.sh腳本
從以下看實際上是調用了java命令啟動了個java的項目(-jar ${BASE_DIR}/target/${SERVER}.jar 將參數對應替換後 -jar ${BASE_DIR}/target/nacos-server.jar)
去尋找啟動入口的時候會發現,它其實是SpringBoot搭建的一個WEB服務。
cygwin=false darwin=false os400=false case "`uname`" in CYGWIN*) cygwin=true;; Darwin*) darwin=true;; OS400*) os400=true;; esac error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java [ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME if [ -z "$JAVA_HOME" ]; then if $darwin; then if [ -x '/usr/libexec/java_home' ] ; then export JAVA_HOME=`/usr/libexec/java_home` elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" fi else JAVA_PATH=`dirname $(readlink -f $(which javac))` if [ "x$JAVA_PATH" != "x" ]; then export JAVA_HOME=`dirname $JAVA_PATH 2>/dev/null` fi fi if [ -z "$JAVA_HOME" ]; then error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!" fi fi export SERVER="nacos-server" export MODE="cluster" export FUNCTION_MODE="all" export MEMBER_LIST="" export EMBEDDED_STORAGE="" while getopts ":m:f:s:c:p:" opt do case $opt in m) MODE=$OPTARG;; f) FUNCTION_MODE=$OPTARG;; s) SERVER=$OPTARG;; c) MEMBER_LIST=$OPTARG;; p) EMBEDDED_STORAGE=$OPTARG;; ?) echo "Unknown parameter" exit 1;; esac done export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=`cd $(dirname $0)/..; pwd` export CUSTOM_SEARCH_LOCATIONS=file:${BASE_DIR}/conf/ #=========================================================================================== # JVM Configuration #=========================================================================================== if [[ "${MODE}" == "standalone" ]]; then JAVA_OPT="${JAVA_OPT} -Xms512m -Xmx512m -Xmn256m" JAVA_OPT="${JAVA_OPT} -Dnacos.standalone=true" else if [[ "${EMBEDDED_STORAGE}" == "embedded" ]]; then JAVA_OPT="${JAVA_OPT} -DembeddedStorage=true" fi JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" fi if [[ "${FUNCTION_MODE}" == "config" ]]; then JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=config" elif [[ "${FUNCTION_MODE}" == "naming" ]]; then JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=naming" fi JAVA_OPT="${JAVA_OPT} -Dnacos.member.list=${MEMBER_LIST}" JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p') if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${BASE_DIR}/logs/nacos_gc.log:time,tags:filecount=10,filesize=102400" else JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext" JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M" fi JAVA_OPT="${JAVA_OPT} -Dloader.path=${BASE_DIR}/plugins/health,${BASE_DIR}/plugins/cmdb" JAVA_OPT="${JAVA_OPT} -Dnacos.home=${BASE_DIR}" JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/target/${SERVER}.jar" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}" JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/nacos-logback.xml" JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288" if [ ! -d "${BASE_DIR}/logs" ]; then mkdir ${BASE_DIR}/logs fi echo "$JAVA ${JAVA_OPT}" if [[ "${MODE}" == "standalone" ]]; then echo "nacos is starting with standalone" else echo "nacos is starting with cluster" fi # check the start.out log output file if [ ! -f "${BASE_DIR}/logs/start.out" ]; then touch "${BASE_DIR}/logs/start.out" fi # start echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 & nohup $JAVA ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 & echo "nacos is starting,you can check the ${BASE_DIR}/logs/start.out"
從客戶端開始分析
【1】根據自動裝配原理(尋找spring.factories文件配置)
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
【2】分析NacosDiscoveryAutoConfiguration類自動裝配了什麼
@Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } //可以看出是將上面兩個Bean當做參數傳入了這個Bean @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
【3】分析NacosAutoServiceRegistration類有什麼重要性
利用監聽機制,達到註冊服務的目的。監聽WebServer初始化事件
//class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> //abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> //因為繼承了ApplicationListener,必然會有監聽方法 public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) {return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } protected void register() { this.serviceRegistry.register(getRegistration()); } @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); } catch (Exception e) { // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); } }
【4】分析如何註冊的【服務註冊】
//NacosNamingService類的registerInstance方法 @Override public void registerInstance(String serviceName, Instance instance) throws NacosException { registerInstance(serviceName, Constants.DEFAULT_GROUP, instance); } //NacosNamingService類#registerInstance方法 @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加一個延時執行的心跳任務 beatReactor.addBeatInfo(groupedServiceName, beatInfo); }
//進行服務註冊 serverProxy.registerService(groupedServiceName, groupName, instance); } //NamingProxy類#registerService方法 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { //構建註冊參數 final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //向服務端發送請求 //UtilAndComs.nacosUrlInstance=/nacos/v1/ns/instance 也就是官網所示的註冊介面地址 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(...); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; } index = (index + 1) % servers.size(); } } throw new NacosException(...); } public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0; injectSecurityInfo(params); Header header = builderHeader(); String url; if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!IPUtil.containsPort(curServer)) { curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { //真正遠程調用 HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException(restResult.getCode(), restResult.getMessage()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } } public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception { RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues); return execute(url, httpMethod, requestHttpEntity, responseType); } private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception { URI uri = HttpUtils.buildUri(url, requestEntity.getQuery()); ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType); HttpClientResponse response = null; try { //使用JdkHttpClientRequest去發起請求 response = this.requestClient().execute(uri, httpMethod, requestEntity); return responseHandler.handle(response); } finally { if (response != null) { response.close(); } } } //JdkHttpClientRequest類#execute方法 @Override public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception { final Object body = requestHttpEntity.getBody(); final Header headers = requestHttpEntity.getHeaders(); replaceDefaultConfig(requestHttpEntity.getHttpClientConfig()); HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection(); Map<String, String> headerMap = headers.getHeader(); if (headerMap != null && headerMap.size() > 0) { for (Map.Entry<String, String> entry : headerMap.entrySet()) { conn.setRequestProperty(entry.getKey(), entry.getValue()); } } conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis()); conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis()); conn.setRequestMethod(httpMethod); if (body != null && !"".equals(body)) { String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE); String bodyStr = JacksonUtils.toJson(body); if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) { Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class); bodyStr = HttpUtils.encodingParams(map, headers.getCharset()); } if (bodyStr != null) { conn.setDoOutput(true); byte[] b = bodyStr.getBytes(); conn.setRequestProperty("Content-Length", String.valueOf(b.length)); OutputStream outputStream = conn.getOutputStream(); outputStream.write(b, 0, b.length); outputStream.flush(); IoUtils.closeQuietly(outputStream); } } conn.connect(); return new JdkHttpClientResponse(conn); }
【5】beatReactor.addBeatInfo 心跳任務的流程【服務心跳】
//BeatReactor類#構造方法 public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; //定義延遲的線程池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } //添加任務方法 public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //實際上就是往延遲的線程池添加任務 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } //分析心跳任務類,主要都是run方法 //這種調用方式eureka中也是 class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { //調用server代理實例發送心跳介面 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //服務返回沒有,則再次註冊 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //又是一個註冊方法的調用 serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) {...} //方法內再次將任務塞入,形成迴圈調用 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); //地址為/nacos/v1/ns/instance/beat String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
【6】分析如何引入服務的【服務發現】
//NacosNamingService類#getAllInstances方法 @Override public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 是否是訂閱模式,預設是true if (subscribe) { // 先從客戶端緩存獲取服務信息 serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 如果本地緩存不存在服務信息,則進行訂閱 serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } List<Instance> list; // 從服務信息中獲取實例列表 if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList<Instance>(); } return list; }
【6.1】分析先從緩存中拿的hostReactor.getServiceInfo方法
//獲取服務信息 public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //獲取服務的信息 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); //客戶端第一次獲取這個註冊表信息為空 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); //會去拉取這個註冊中心裡面的註冊表信息 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } //如果本地緩存裡面已有這個註冊表信息 else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) {...} } } } //客戶端會開啟一個定時任務,每隔幾秒會去拉取註冊中心裡面的全部實例的信息 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } //HostReactor類# Map<String, ServiceInfo> serviceInfoMap屬性【這個便是客戶端保存實例數據的緩存所在】 //實際上是先從serviceInfoMap屬性裡面拿的 private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key); }
【6.1.1】分析遠程拉取流程updateServiceNow方法
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) {...} } public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { //遠程調用 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { //處理並塞入serviceInfoMap,還會發送一個InstancesChangeEvent事件 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); //調用服務的API,