微服務組件-----Spring Cloud Alibaba 註冊中心 Nacos源碼(1.4.x版本)分析

来源:https://www.cnblogs.com/chafry/archive/2022/10/26/16801273.html
-Advertisement-
Play Games

核心功能點 【1】服務註冊:Nacos Client會通過發送REST請求的方式向Nacos Server註冊自己的服務,提供自身的元數據,比如ip地址、埠等信息。Nacos Server接收到註冊請求後,就會把這些元數據信息存儲在一個雙層的記憶體Map中。 【2】服務心跳:在服務註冊後,Nacos ...


 

核心功能點

【1】服務註冊:Nacos Client會通過發送REST請求的方式向Nacos Server註冊自己的服務,提供自身的元數據,比如ip地址、埠等信息。Nacos Server接收到註冊請求後,就會把這些元數據信息存儲在一個雙層的記憶體Map中。

【2】服務心跳:在服務註冊後,Nacos Client會維護一個定時心跳來持續通知Nacos Server,說明服務一直處於可用狀態,防止被剔除。預設5s發送一次心跳

【3】服務同步:Nacos Server集群之間會互相同步服務實例,用來保證服務信息的一致性。  

【4】服務發現:服務消費者(Nacos Client)在調用服務提供者的服務時,會發送一個REST請求給Nacos Server,獲取上面註冊的服務清單,並且緩存在Nacos Client本地,同時會在Nacos Client本地開啟一個定時任務定時拉取服務端最新的註冊表信息更新到本地緩存

【5】服務健康檢查:Nacos Server會開啟一個定時任務用來檢查註冊服務實例的健康情況,對於超過15s沒有收到客戶端心跳的實例會將它的healthy屬性置為false(客戶端服務發現時不會發現),如果某個實例超過30秒沒有收到心跳,直接剔除該實例(被剔除的實例如果恢複發送心跳則會重新註冊)

 

源碼精髓總結

【1】註冊表的結構說明(這個僅是記錄):

//Map<namespaceId, Map<service_name, Service>【ConcurrentSkipListMap】>
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
//再分析裡面的Service,Map<clusterName, Cluster>
private Map<String, Cluster> clusterMap = new HashMap<>();
//再分析Cluster
private Set<Instance> persistentInstances = new HashSet<>();
private Set<Instance> ephemeralInstances = new HashSet<>();

【2】分析註冊表為何要這麼設計

1.註冊表是基於第一層ConcurrentHashMap,第二層ConcurrentSkipListMap,第三層HashMap,然後定位到對應的Cluster。
2.至於為什麼要這樣設計,一方面是將粒度劃分的更細,通過源碼分析可知,nacos更新註冊表是進行小範圍的更新,如定位到Cluster的臨時列表ephemeralInstances或者持久列表persistentInstances【這兩個都是set集合,所以排除了會有重覆的數據】。因為粒度小所以更新速度會更快。
3.其次採用的是 寫時複製思想,也就是說,不會影響讀取的效率,因為是新開一個副本,將新舊的數據合併到一個新數據裡面,然後將引用指向新數據。
4.其次是為了高擴展,對namespace進行劃分【對開發環境隔離】,對service進行劃分【對服務進行隔離】,對Cluster進行劃分【多機房部署,加快訪問速度】
5.為瞭解決併發讀寫問題,採用的是ConcurrentHashMap與ConcurrentSkipListMap的分段鎖,加上Cluster裡面的寫時複製。其次Cluster裡面是不加鎖的,因為是單線程進行修改,不存在衝突。
6.雖說犧牲了,一定的實時性,但是大大提高了併發的性能。

【3】分析AP架構下為什麼高性能的原因

1.因為採用的是非同步任務加隊列的形式來實現註冊的,所以響應很快,然後任務是慢慢做的。
2.Notifier 是在DistroConsistencyServiceImpl類中初始化,預設單線程,而且隊列為ArrayBlockingQueue<>(1024 * 1024)。
3.縮小了變更數據的粒度,單線程避免了線程安全問題【不用加鎖】。
4.這種方式毫無疑問是會存在問題的,就是響應了但是沒有註冊上。但是對於這個問題,在客戶端裡面做了心跳機制,如果檢測不到會重新註冊。

 【4】分析Nacos為什麼感知快的原因

採用的是客戶端定時進行一次拉取,兼服務端採用非同步的形式使用UDP發送更新的數據到客戶端;
雖然UDP存在通知丟失的情況,但是每隔1s的拉取依舊能很好的保持數據的最終一致性。

 

源碼分析

驗證服務端

【1】在啟動的時候我們一般是調用shell腳本啟動,查看startup.sh腳本

  從以下看實際上是調用了java命令啟動了個java的項目(-jar ${BASE_DIR}/target/${SERVER}.jar 將參數對應替換後 -jar ${BASE_DIR}/target/nacos-server.jar)

  去尋找啟動入口的時候會發現,它其實是SpringBoot搭建的一個WEB服務。

cygwin=false
darwin=false
os400=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Darwin*) darwin=true;;
OS400*) os400=true;;
esac
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
[ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME

if [ -z "$JAVA_HOME" ]; then
  if $darwin; then

    if [ -x '/usr/libexec/java_home' ] ; then
      export JAVA_HOME=`/usr/libexec/java_home`

    elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then
      export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home"
    fi
  else
    JAVA_PATH=`dirname $(readlink -f $(which javac))`
    if [ "x$JAVA_PATH" != "x" ]; then
      export JAVA_HOME=`dirname $JAVA_PATH 2>/dev/null`
    fi
  fi
  if [ -z "$JAVA_HOME" ]; then
        error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!"
  fi
fi

export SERVER="nacos-server"
export MODE="cluster"
export FUNCTION_MODE="all"
export MEMBER_LIST=""
export EMBEDDED_STORAGE=""
while getopts ":m:f:s:c:p:" opt
do
    case $opt in
        m)
            MODE=$OPTARG;;
        f)
            FUNCTION_MODE=$OPTARG;;
        s)
            SERVER=$OPTARG;;
        c)
            MEMBER_LIST=$OPTARG;;
        p)
            EMBEDDED_STORAGE=$OPTARG;;
        ?)
        echo "Unknown parameter"
        exit 1;;
    esac
done

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=`cd $(dirname $0)/..; pwd`
export CUSTOM_SEARCH_LOCATIONS=file:${BASE_DIR}/conf/

#===========================================================================================
# JVM Configuration
#===========================================================================================
if [[ "${MODE}" == "standalone" ]]; then
    JAVA_OPT="${JAVA_OPT} -Xms512m -Xmx512m -Xmn256m"
    JAVA_OPT="${JAVA_OPT} -Dnacos.standalone=true"
else
    if [[ "${EMBEDDED_STORAGE}" == "embedded" ]]; then
        JAVA_OPT="${JAVA_OPT} -DembeddedStorage=true"
    fi
    JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof"
    JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"

fi

if [[ "${FUNCTION_MODE}" == "config" ]]; then
    JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=config"
elif [[ "${FUNCTION_MODE}" == "naming" ]]; then
    JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=naming"
fi

JAVA_OPT="${JAVA_OPT} -Dnacos.member.list=${MEMBER_LIST}"

JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
  JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${BASE_DIR}/logs/nacos_gc.log:time,tags:filecount=10,filesize=102400"
else
  JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
  JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi

JAVA_OPT="${JAVA_OPT} -Dloader.path=${BASE_DIR}/plugins/health,${BASE_DIR}/plugins/cmdb"
JAVA_OPT="${JAVA_OPT} -Dnacos.home=${BASE_DIR}"
JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/target/${SERVER}.jar"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}"
JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/nacos-logback.xml"
JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"

if [ ! -d "${BASE_DIR}/logs" ]; then
  mkdir ${BASE_DIR}/logs
fi

echo "$JAVA ${JAVA_OPT}"

if [[ "${MODE}" == "standalone" ]]; then
    echo "nacos is starting with standalone"
else
    echo "nacos is starting with cluster"
fi

# check the start.out log output file
if [ ! -f "${BASE_DIR}/logs/start.out" ]; then
  touch "${BASE_DIR}/logs/start.out"
fi
# start
echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 &
nohup $JAVA ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 &
echo "nacos is starting,you can check the ${BASE_DIR}/logs/start.out"

 

從客戶端開始分析

【1】根據自動裝配原理(尋找spring.factories文件配置)

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

【2】分析NacosDiscoveryAutoConfiguration類自動裝配了什麼

@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(nacosDiscoveryProperties, context);
    }

    //可以看出是將上面兩個Bean當做參數傳入了這個Bean
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
    }
}

 

 【3】分析NacosAutoServiceRegistration類有什麼重要性

  利用監聽機制,達到註冊服務的目的。監聽WebServer初始化事件

//class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration>
//abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent>
//因為繼承了ApplicationListener,必然會有監聽方法
public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (context instanceof ConfigurableWebServerApplicationContext) {
        if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
            return;
        }
    }
    this.port.compareAndSet(0, event.getWebServer().getPort());
    this.start();
}

public void start() {
    if (!isEnabled()) {return;
    }

    // only initialize if nonSecurePort is greater than 0 and it isn't already running
    // because of containerPortInitializer below
    if (!this.running.get()) {
        this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
        register();
        if (shouldRegisterManagement()) {
            registerManagement();
        }
        this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
        this.running.compareAndSet(false, true);
    }

}

protected void register() {
    this.serviceRegistry.register(getRegistration());
}

@Override
public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        return;
    }

    NamingService namingService = namingService();
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();

    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        namingService.registerInstance(serviceId, group, instance);
    }
    catch (Exception e) {
        // rethrow a RuntimeException if the registration is failed.
        // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
        rethrowRuntimeException(e);
    }
}

 

【4】分析如何註冊的【服務註冊

//NacosNamingService類的registerInstance方法
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}

//NacosNamingService類#registerInstance方法
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
     //添加一個延時執行的心跳任務 beatReactor.addBeatInfo(groupedServiceName, beatInfo); }
   //進行服務註冊 serverProxy.registerService(groupedServiceName, groupName, instance); }
//NamingProxy類#registerService方法 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { //構建註冊參數 final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //向服務端發送請求 //UtilAndComs.nacosUrlInstance=/nacos/v1/ns/instance 也就是官網所示的註冊介面地址 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(...); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; } index = (index + 1) % servers.size(); } } throw new NacosException(...); } public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0; injectSecurityInfo(params); Header header = builderHeader(); String url; if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!IPUtil.containsPort(curServer)) { curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { //真正遠程調用 HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException(restResult.getCode(), restResult.getMessage()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } } public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception { RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues); return execute(url, httpMethod, requestHttpEntity, responseType); } private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception { URI uri = HttpUtils.buildUri(url, requestEntity.getQuery()); ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType); HttpClientResponse response = null; try { //使用JdkHttpClientRequest去發起請求 response = this.requestClient().execute(uri, httpMethod, requestEntity); return responseHandler.handle(response); } finally { if (response != null) { response.close(); } } } //JdkHttpClientRequest類#execute方法 @Override public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception { final Object body = requestHttpEntity.getBody(); final Header headers = requestHttpEntity.getHeaders(); replaceDefaultConfig(requestHttpEntity.getHttpClientConfig()); HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection(); Map<String, String> headerMap = headers.getHeader(); if (headerMap != null && headerMap.size() > 0) { for (Map.Entry<String, String> entry : headerMap.entrySet()) { conn.setRequestProperty(entry.getKey(), entry.getValue()); } } conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis()); conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis()); conn.setRequestMethod(httpMethod); if (body != null && !"".equals(body)) { String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE); String bodyStr = JacksonUtils.toJson(body); if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) { Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class); bodyStr = HttpUtils.encodingParams(map, headers.getCharset()); } if (bodyStr != null) { conn.setDoOutput(true); byte[] b = bodyStr.getBytes(); conn.setRequestProperty("Content-Length", String.valueOf(b.length)); OutputStream outputStream = conn.getOutputStream(); outputStream.write(b, 0, b.length); outputStream.flush(); IoUtils.closeQuietly(outputStream); } } conn.connect(); return new JdkHttpClientResponse(conn); }

 

【5】beatReactor.addBeatInfo  心跳任務的流程【服務心跳

//BeatReactor類#構造方法
public BeatReactor(NamingProxy serverProxy, int threadCount) {
    this.serverProxy = serverProxy;
    //定義延遲的線程池
    this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.beat.sender");
            return thread;
        }
    });
}

//添加任務方法
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    //實際上就是往延遲的線程池添加任務
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

//分析心跳任務類,主要都是run方法
//這種調用方式eureka中也是
class BeatTask implements Runnable {
    
    BeatInfo beatInfo;
    
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    
    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            //調用server代理實例發送心跳介面
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.get("clientBeatInterval").asLong();
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            //服務返回沒有,則再次註冊
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    //又是一個註冊方法的調用
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {...}
        //方法內再次將任務塞入,形成迴圈調用
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    //地址為/nacos/v1/ns/instance/beat
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

 

【6】分析如何引入服務的【服務發現

//NacosNamingService類#getAllInstances方法
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
    
    ServiceInfo serviceInfo;
    // 是否是訂閱模式,預設是true
    if (subscribe) {
        // 先從客戶端緩存獲取服務信息
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        // 如果本地緩存不存在服務信息,則進行訂閱
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    List<Instance> list;
    // 從服務信息中獲取實例列表
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

 

【6.1】分析先從緩存中拿的hostReactor.getServiceInfo方法

//獲取服務信息
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    //獲取服務的信息
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    //客戶端第一次獲取這個註冊表信息為空
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        updatingMap.put(serviceName, new Object());

        //會去拉取這個註冊中心裡面的註冊表信息
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);
        
    } 
    //如果本地緩存裡面已有這個註冊表信息
    else if (updatingMap.containsKey(serviceName)) {
        
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {...}
            }
        }
    }
    //客戶端會開啟一個定時任務,每隔幾秒會去拉取註冊中心裡面的全部實例的信息
    scheduleUpdateIfAbsent(serviceName, clusters);
    
    return serviceInfoMap.get(serviceObj.getKey());
}

//HostReactor類# Map<String, ServiceInfo> serviceInfoMap屬性【這個便是客戶端保存實例數據的緩存所在】
//實際上是先從serviceInfoMap屬性裡面拿的
private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
    String key = ServiceInfo.getKey(serviceName, clusters);
    return serviceInfoMap.get(key);
}

 

【6.1.1】分析遠程拉取流程updateServiceNow方法

private void updateServiceNow(String serviceName, String clusters) {
    try {
        updateService(serviceName, clusters);
    } catch (NacosException e) {...}
}

public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        //遠程調用
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        
        if (StringUtils.isNotEmpty(result)) {
            //處理並塞入serviceInfoMap,還會發送一個InstancesChangeEvent事件
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
    
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
    //調用服務的API,

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 模板方法模式是一種行為設計模式,它在超類中定義了一個演算法的框架,允許子類在不修改結構的情況下重寫演算法的特定步驟。 ...
  • 您好,我是湘王,這是我的博客園,歡迎您來,歡迎您再來~ 為了提高CPU的利用率,工程師們創造了多線程。但是線程們說:要有光!(為了減少線程創建(T1啟動)和銷毀(T3切換)的時間),於是工程師們又接著創造了線程池ThreadPool。就這樣就可以了嗎?——不,工程師們並不滿足於此,他們不把自己創造出 ...
  • 之所以提這個坑,是因為,今天下午,通過監控系統,發現我們系統生產能力突然下降,頻繁報無法獲取資料庫連接。究其原因,竟然是因為mybatisplus的這個“坑”導致的。 ...
  • 圍棋要實現什麼功能呢? 首先是黑棋和白棋,下麵的代碼,黑棋占領的位置被賦值為1,白棋是2; 其次有幾個圍棋中的規則(不知道是不是這樣的,老師是這麼告訴我的,可以指正):1.塊:上下左右可以連起來的叫塊,一顆棋上下左右四個方向若有棋子,就可以稱作是一塊,一個棋子上下左右都沒有,也可以稱作一塊。2.氣: ...
  • 前言 嗨嘍~大家好呀,這裡是魔王吶 ! 網上有很多的創意二維碼,看了,別的不說 羡慕肯定是有的,羡慕有了這不得自己整點活~ 今天我們就來試試只用幾行代碼,生成動態二維碼! 開發環境: Python 3.8 Pycharm 模塊使用: 第三方模塊 需要安裝 在cmd裡面 進行 pip install ...
  • Java 8 系列文章 持續更新中 日期時間API 也是Java 8重要的更新之一,Java從一開始就缺少一致的日期和時間方法,Java 8 Date Time API是Java核心API的一個非常好的補充。 為什麼需要新的日期時間API Java中現有的與日期和時間相關的類存在一些問題: 日期時間 ...
  • 這裡我們以QQ郵箱為例。 一、導入依賴: <dependencies> <!-- https://mvnrepository.com/artifact/javax.activation/activation --> <dependency> <groupId>javax.activation</gr ...
  • 泛型 泛型定義 Scala的泛型和Java中的泛型表達的含義都是一樣的,對處理的數據類型進行約束,但是Scala提供了更加強大的功能 scala中的泛型採用中括弧 scala中的泛型是不可變的 泛型和類型不是一個層面的東西 所以scala中泛型和類型無法聯合使用 泛型語法 如果能將類型和泛型當成一個 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...