手撕Nacos源碼,今日撕服務戶端源碼

来源:https://www.cnblogs.com/jiagooushi/archive/2022/07/08/16458108.html
-Advertisement-
Play Games

緊接上文,我們分析了Nacos的客戶端代碼, 今天我們再來試一下服務端 ,至此就可以Nacos源碼就告一段落,歡迎大家品鑒。 nacos服務端 註冊中心服務端的主要功能包括,接收客戶端的服務註冊,服務發現,服務下線的功能,但是除了這些和客戶端的交互之外,服務端還要做一些更重要的事情,就是我們常常會在 ...


緊接上文,我們分析了Nacos的客戶端代碼,

今天我們再來試一下服務端 ,至此就可以Nacos源碼就告一段落,歡迎大家品鑒。

nacos服務端

註冊中心服務端的主要功能包括,接收客戶端的服務註冊,服務發現,服務下線的功能,但是除了這些和客戶端的交互之外,服務端還要做一些更重要的事情,就是我們常常會在分散式系統中聽到的AP和CP,作為一個集群,nacos即實現了AP也實現了CP,其中AP使用的自己實現的Distro協議,而CP是採用raft協議實現的,這個過程中牽涉到心跳、選主等操作。

我們來學習一下註冊中心服務端接收客戶端服務註冊的功能。

註冊處理

我們先來學習一下Nacos的工具類WebUtils,該工具類在nacos-core工程下,該工具類是用於處理請求參數轉化的,裡面提供了2個常被用到的方法required()optional()

required方法通過參數名key,解析HttpServletRequest請求中的參數,並轉碼為UTF-8編碼。

optional方法在required方法的基礎上增加了預設值,如果獲取不到,則返回預設值。

代碼如下:

/**
 * required方法通過參數名key,解析HttpServletRequest請求中的參數,並轉碼為UTF-8編碼。
 */
public static String required(final HttpServletRequest req, final String key) {
    String value = req.getParameter(key);
    if (StringUtils.isEmpty(value)) {
        throw new IllegalArgumentException("Param '" + key + "' is required.");
    }
    String encoding = req.getParameter("encoding");
    return resolveValue(value, encoding);
}

/**
 * optional方法在required方法的基礎上增加了預設值,如果獲取不到,則返回預設值。
 */
public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
    if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
        return defaultValue;
    }
    String value = req.getParameter(key);
    if (StringUtils.isBlank(value)) {
        return defaultValue;
    }
    String encoding = req.getParameter("encoding");
    return resolveValue(value, encoding);
}

nacos 的 server 與 client使用了http協議來交互,那麼在server端必定提供了http介面的入口,並且在core模塊看到其依賴了spring boot starter,所以它的http介面由集成了Spring的web伺服器支持,簡單地說就是像我們平時寫的業務服務一樣,有controller層和service層。

以OpenAPI作為入口來學習,我們找到/nacos/v1/ns/instance服務註冊介面,在nacos-naming工程中我們可以看到InstanceController正是我們要找的對象,如下圖:

file

處理服務註冊,我們直接找對應的POST方法即可,代碼如下:

/**
 * Register new instance.
 * 接收客戶端註冊信息
 * @param request http request
 * @return 'ok' if success
 * @throws Exception any error during register
 */
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    //獲取namespaceid,該參數是可選參數
    final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //獲取服務名字
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    //校驗服務的名字,服務的名字格式為groupName@@serviceName
    NamingUtils.checkServiceNameFormat(serviceName);
    //創建實例
    final Instance instance = parseInstance(request);
    //註冊服務
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

如上圖,該方法主要用於接收客戶端註冊信息,並且會校驗參數是否存在問題,如果不存在問題就創建服務的實例,服務實例創建後將服務實例註冊到Nacos中,註冊的方法如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //判斷本地緩存中是否存在該命名空間,如果不存在就創建,之後判斷該命名空間下是否
    //存在該服務,如果不存在就創建空的服務
    //如果實例為空,則創建實例,並且會將創建的實例存入到serviceMap集合中
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    //從serviceMap集合中獲取創建的實例
    Service service = getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    //服務註冊,這一步才會把服務的實例信息和服務綁定起來
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

註冊的方法中會先創建該實例對象,創建前先檢查本地緩存是否存在該實例對象,如果不存在就創建,最後註冊該服務,並且該服務會和實例信息捆綁到一起。

Distro協議介紹

Distro是阿裡巴巴的私有協議, 是一種分散式一致性演算法,目前流行的Nacos服務管理框架就採用了Distro協議。Distro 協議被定位為 臨時數據的一致性協議:該類型協議, 不需要把數據存儲到磁碟或者資料庫,因為臨時數據通常和伺服器保持一個session會話, 該會話只要存在,數據就不會丟失

Distro 協議保證寫必須永遠是成功的,即使可能會發生網路分區。當網路恢復時,把各數據分片的數據進行合併。

Distro 協議具有以下特點:

1:專門為了註冊中心而創造出的協議;
2:客戶端與服務端有兩個重要的交互,服務註冊與心跳發送;
3:客戶端以服務為維度向服務端註冊,註冊後每隔一段時間向服務端發送一次心跳,心跳包需要帶上註冊服務的全部信息,在客戶端看來,服務端節點對等,所以請求的節點是隨機的;
4:客戶端請求失敗則換一個節點重新發送請求;
5:服務端節點都存儲所有數據,但每個節點只負責其中一部分服務,在接收到客戶端的“寫”(註冊、心跳、下線等)請求後,服務端節點判斷請求的服務是否為自己負責,如果是,則處理,否則交由負責的節點處理;
6:每個服務端節點主動發送健康檢查到其他節點,響應的節點被該節點視為健康節點;
7:服務端在接收到客戶端的服務心跳後,如果該服務不存在,則將該心跳請求當做註冊請求來處理;
8:服務端如果長時間未收到客戶端心跳,則下線該服務;
9:負責的節點在接收到服務註冊、服務心跳等寫請求後將數據寫入後即返回,後臺非同步地將數據同步給其他節點;
10:節點在收到讀請求後直接從本機獲取後返回,無論數據是否為最新。

Distro定址

Distro協議主要用於nacos 服務端節點之間的相互發現,nacos使用定址機制來實現服務端節點的管理。在Nacos中,定址模式有三種:

單機模式(StandaloneMemberLookup)
文件模式(FileConfigMemberLookup)
伺服器模式(AddressServerMemberLookup)

三種定址模式如下圖:

file

1.2.3.1 單機模式

com.alibaba.nacos.core.cluster.lookup.LookupFactory中有創建定址方式,可以創建集群啟動方式、單機啟動方式,不同啟動方式就決定了不同定址模式,如果是集群啟動,

/**
 * Create the target addressing pattern.
 * 創建定址模式
 * @param memberManager {@link ServerMemberManager}
 * @return {@link MemberLookup}
 * @throws NacosException NacosException
 */
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
    //NacosServer 集群方式啟動
    if (!EnvUtil.getStandaloneMode()) {
        String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
        //由參數中傳入的定址方式得到LookupType對象
        LookupType type = chooseLookup(lookupType);
        //選擇定址方式
        LOOK_UP = find(type);
        //設置當前定址方式
        currentLookupType = type;
    } else {
        //NacosServer單機啟動
        LOOK_UP = new StandaloneMemberLookup();
    }
    LOOK_UP.injectMemberManager(memberManager);
    Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
    return LOOK_UP;
}


/***
 * 選擇定址方式
 * @param type
 * @return
 */
private static MemberLookup find(LookupType type) {
    //文件定址模式,也就是配置cluster.conf配置文件將多個節點串聯起來,
    // 通過配置文件尋找其他節點,以達到和其他節點通信的目的
    if (LookupType.FILE_CONFIG.equals(type)) {
        LOOK_UP = new FileConfigMemberLookup();
        return LOOK_UP;
    }
    //伺服器模式
    if (LookupType.ADDRESS_SERVER.equals(type)) {
        LOOK_UP = new AddressServerMemberLookup();
        return LOOK_UP;
    }
    // unpossible to run here
    throw new IllegalArgumentException();
}

單節點定址模式會直接創建StandaloneMemberLookup對象,而文件定址模式會創建FileConfigMemberLookup對象,伺服器定址模式會創建AddressServerMemberLookup

1.2.3.2 文件定址模式

file

文件定址模式主要在創建集群的時候,通過cluster.conf來配置集群,程式可以通過監聽cluster.conf文件變化實現動態管理節點,FileConfigMemberLookup源碼如下:

public class FileConfigMemberLookup extends AbstractMemberLookup {

    //創建文件監聽器
    private FileWatcher watcher = new FileWatcher() {

        //文件發生變更事件
        @Override
        public void onChange(FileChangeEvent event) {
            readClusterConfFromDisk();
        }
        //檢查context是否包含cluster.conf
        @Override
        public boolean interest(String context) {
            return StringUtils.contains(context, "cluster.conf");
        }
    };

    @Override
    public void start() throws NacosException {
        if (start.compareAndSet(false, true)) {
            readClusterConfFromDisk();
            // 使用inotify機制來監視文件更改,並自動觸發對cluster.conf的讀取
            try {
                WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
            } catch (Throwable e) {
                Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
            }
        }
    }

    @Override
    public void destroy() throws NacosException {
        WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
    }

    private void readClusterConfFromDisk() {
        Collection<Member> tmpMembers = new ArrayList<>();
        try {
            List<String> tmp = EnvUtil.readClusterConf();
            tmpMembers = MemberUtil.readServerConf(tmp);
        } catch (Throwable e) {
            Loggers.CLUSTER
                    .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
        }

        afterLookup(tmpMembers);
    }
}
1.2.3.3 伺服器定址模式

使用地址伺服器存儲節點信息,會創建AddressServerMemberLookup,服務端定時拉取信息進行管理;

public class AddressServerMemberLookup extends AbstractMemberLookup {

    private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
    };

    public String domainName;

    public String addressPort;

    public String addressUrl;

    public String envIdUrl;

    public String addressServerUrl;

    private volatile boolean isAddressServerHealth = true;

    private int addressServerFailCount = 0;

    private int maxFailCount = 12;

    private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);

    private volatile boolean shutdown = false;

    @Override
    public void start() throws NacosException {
        if (start.compareAndSet(false, true)) {
            this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
            initAddressSys();
            run();
        }
    }

    /***
     * 獲取伺服器地址
     */
    private void initAddressSys() {
        String envDomainName = System.getenv("address_server_domain");
        if (StringUtils.isBlank(envDomainName)) {
            domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");
        } else {
            domainName = envDomainName;
        }
        String envAddressPort = System.getenv("address_server_port");
        if (StringUtils.isBlank(envAddressPort)) {
            addressPort = EnvUtil.getProperty("address.server.port", "8080");
        } else {
            addressPort = envAddressPort;
        }
        String envAddressUrl = System.getenv("address_server_url");
        if (StringUtils.isBlank(envAddressUrl)) {
            addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");
        } else {
            addressUrl = envAddressUrl;
        }
        addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
        envIdUrl = "http://" + domainName + ":" + addressPort + "/env";

        Loggers.CORE.info("ServerListService address-server port:" + addressPort);
        Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
    }

    @SuppressWarnings("PMD.UndefineMagicConstantRule")
    private void run() throws NacosException {
        // With the address server, you need to perform a synchronous member node pull at startup
        // Repeat three times, successfully jump out
        boolean success = false;
        Throwable ex = null;
        int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
        for (int i = 0; i < maxRetry; i++) {
            try {
                //拉取集群節點信息
                syncFromAddressUrl();
                success = true;
                break;
            } catch (Throwable e) {
                ex = e;
                Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
            }
        }
        if (!success) {
            throw new NacosException(NacosException.SERVER_ERROR, ex);
        }
        //創建定時任務
        GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
    }

    @Override
    public void destroy() throws NacosException {
        shutdown = true;
    }

    @Override
    public Map<String, Object> info() {
        Map<String, Object> info = new HashMap<>(4);
        info.put("addressServerHealth", isAddressServerHealth);
        info.put("addressServerUrl", addressServerUrl);
        info.put("envIdUrl", envIdUrl);
        info.put("addressServerFailCount", addressServerFailCount);
        return info;
    }

    private void syncFromAddressUrl() throws Exception {
        RestResult<String> result = restTemplate
                .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
        if (result.ok()) {
            isAddressServerHealth = true;
            Reader reader = new StringReader(result.getData());
            try {
                afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
            } catch (Throwable e) {
                Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
                        ExceptionUtil.getAllExceptionMsg(e));
            }
            addressServerFailCount = 0;
        } else {
            addressServerFailCount++;
            if (addressServerFailCount >= maxFailCount) {
                isAddressServerHealth = false;
            }
            Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
        }
    }

    // 定時任務
    class AddressServerSyncTask implements Runnable {

        @Override
        public void run() {
            if (shutdown) {
                return;
            }
            try {
                //拉取服務列表
                syncFromAddressUrl();
            } catch (Throwable ex) {
                addressServerFailCount++;
                if (addressServerFailCount >= maxFailCount) {
                    isAddressServerHealth = false;
                }
                Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
            } finally {
                GlobalExecutor.scheduleByCommon(this, 5_000L);
            }
        }
    }
}

數據同步

Nacos數據同步分為全量同步和增量同步,所謂全量同步就是初始化數據一次性同步,而增量同步是指有數據增加的時候,只同步增加的數據。

全量同步

file

全量同步流程比較複雜,流程如上圖:

1:啟動一個定時任務線程DistroLoadDataTask載入數據,調用load()方法載入數據

2:調用loadAllDataSnapshotFromRemote()方法從遠程機器同步所有的數據

3:從namingProxy代理獲取所有的數據data

4:構造http請求,調用httpGet方法從指定的server獲取數據

5:從獲取的結果result中獲取數據bytes

6:處理數據processData

7:從data反序列化出datumMap

8:把數據存儲到dataStore,也就是本地緩存dataMap

9:監聽器不包括key,就創建一個空的service,並且綁定監聽器

10:監聽器listener執行成功後,就更新data store
任務啟動

com.alibaba.nacos.core.distributed.distro.DistroProtocol的構造函數中調用startDistroTask()方法,該方法會執行startVerifyTask()startLoadTask(),我們重點關註startLoadTask(),該方法代碼如下:

/***
 * 啟動DistroTask
 */
private void startDistroTask() {
    if (EnvUtil.getStandaloneMode()) {
        isInitialized = true;
        return;
    }
    //啟動startVerifyTask,做數據同步校驗
    startVerifyTask();
    //啟動DistroLoadDataTask,批量載入數據
    startLoadTask();
}

//啟動DistroLoadDataTask
private void startLoadTask() {
    //處理狀態回調對象
    DistroCallback loadCallback = new DistroCallback() {
        //處理成功
        @Override
        public void onSuccess() {
            isInitialized = true;
        }
        //處理失敗
        @Override
        public void onFailed(Throwable throwable) {
            isInitialized = false;
        }
    };
    //執行DistroLoadDataTask,是一個多線程
    GlobalExecutor.submitLoadDataTask(
            new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}

/***
 * 啟動startVerifyTask
 * 數據校驗
 */
private void startVerifyTask() {
    GlobalExecutor.schedulePartitionDataTimedSync(
        new DistroVerifyTask(
            memberManager,
            distroComponentHolder),
        distroConfig.getVerifyIntervalMillis());
}
數據如何執行載入

上面方法會調用DistroLoadDataTask對象,而該對象其實是個線程,因此會執行它的run方法,run方法會調用load()方法實現數據全量載入,代碼如下:

/***
 * 數據載入過程
 */
@Override
public void run() {
    try {
        //載入數據
        load();
        if (!checkCompleted()) {
            GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
        } else {
            loadCallback.onSuccess();
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
        }
    } catch (Exception e) {
        loadCallback.onFailed(e);
        Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
    }
}


/***
 * 載入數據,並同步
 * @throws Exception
 */
private void load() throws Exception {
    while (memberManager.allMembersWithoutSelf().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
        TimeUnit.SECONDS.sleep(1);
    }
    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
        TimeUnit.SECONDS.sleep(1);
    }
    //同步數據
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            //從遠程機器上同步所有數據
            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
        }
    }
}
數據同步

數據同步會通過Http請求從遠程伺服器獲取數據,並同步到當前服務的緩存中,執行流程如下:

1:loadAllDataSnapshotFromRemote()從遠程載入所有數據,並處理同步到本機

2:transportAgent.getDatumSnapshot()遠程載入數據,通過Http請求執行遠程載入

3:dataProcessor.processSnapshot()處理數據同步到本地

數據處理完整邏輯代碼如下:loadAllDataSnapshotFromRemote()方法

/***
 * 從遠程機器上同步所有數據
 */
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                resourceType, transportAgent, dataProcessor);
        return false;
    }
    //遍歷集群成員節點,不包括自己
    for (Member each : memberManager.allMembersWithoutSelf()) {
        try {
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
            //從遠程節點載入數據,調用http請求介面: distro/datums;
            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
            //處理數據
            boolean result = dataProcessor.processSnapshot(distroData);
            Loggers.DISTRO
                    .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                            result);
            if (result) {
                return true;
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
        }
    }
    return false;
}

遠程載入數據代碼如下:transportAgent.getDatumSnapshot()方法

/***
 * 從namingProxy代理獲取所有的數據data,從獲取的結果result中獲取數據bytes;
 * @param targetServer target server.
 * @return
 */
@Override
public DistroData getDatumSnapshot(String targetServer) {
    try {
        //從namingProxy代理獲取所有的數據data,從獲取的結果result中獲取數據bytes;
        byte[] allDatum = NamingProxy.getAllData(targetServer);
        //將數據封裝成DistroData
        return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
    } catch (Exception e) {
        throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
    }
}


/**
 * Get all datum from target server.
 * NamingProxy.getAllData
 * 執行HttpGet請求,並獲取返回數據
 * @param server target server address
 * @return all datum byte array
 * @throws Exception exception
 */
public static byte[] getAllData(String server) throws Exception {
    //參數封裝
    Map<String, String> params = new HashMap<>(8);
    //組裝URL,並執行HttpGet請求,獲取結果集
    RestResult<String> result = HttpClient.httpGet(
            "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
            new ArrayList<>(), params);
    //返回數據
    if (result.ok()) {
        return result.getData().getBytes();
    }
    throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
            + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "
            + result.getMessage());
}

處理數據同步到本地代碼如下:dataProcessor.processSnapshot()

/**
 * 數據處理並更新本地緩存
 * @param data
 * @return
 * @throws Exception
 */
private boolean processData(byte[] data) throws Exception {
    if (data.length > 0) {
        //從data反序列化出datumMap
        Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
        // 把數據存儲到dataStore,也就是本地緩存dataMap
        for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
            dataStore.put(entry.getKey(), entry.getValue());
            //監聽器不包括key,就創建一個空的service,並且綁定監聽器
            if (!listeners.containsKey(entry.getKey())) {
                // pretty sure the service not exist:
                if (switchDomain.isDefaultInstanceEphemeral()) {
                    // create empty service
                    //創建一個空的service
                    Loggers.DISTRO.info("creating service {}", entry.getKey());
                    Service service = new Service();
                    String serviceName = KeyBuilder.getServiceName(entry.getKey());
                    String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                    service.setName(serviceName);
                    service.setNamespaceId(namespaceId);
                    service.setGroupName(Constants.DEFAULT_GROUP);
                    // now validate the service. if failed, exception will be thrown
                    service.setLastModifiedMillis(System.currentTimeMillis());
                    service.recalculateChecksum();

                    // The Listener corresponding to the key value must not be empty
                    // 與鍵值對應的監聽器不能為空,這裡的監聽器類型是 ServiceManager
                    RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
                    if (Objects.isNull(listener)) {
                        return false;
                    }
                    //為空的綁定監聽器
                    listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                }
            }
        }

        //迴圈所有datumMap
        for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {

            if (!listeners.containsKey(entry.getKey())) {
                // Should not happen:
                Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                continue;
            }

            try {
                //執行監聽器的onChange監聽方法
                for (RecordListener listener : listeners.get(entry.getKey())) {
                    listener.onChange(entry.getKey(), entry.getValue().value);
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                continue;
            }

            // Update data store if listener executed successfully:
            // 監聽器listener執行成功後,就更新dataStore
            dataStore.put(entry.getKey(), entry.getValue());
        }
    }
    return true;
}

到此實現數據全量同步,其實全量同步最終封裝的協議還是Http。

增量同步

新增數據使用非同步廣播同步:

1:DistroProtocol 使用 sync() 方法接收增量數據

2:向其他節點發佈廣播任務
  調用 distroTaskEngineHolder 發佈延遲任務
  
3:調用 DistroDelayTaskProcessor.process() 方法進行任務投遞:將延遲任務轉換為非同步變更任務

4:執行變更任務 DistroSyncChangeTask.run() 方法:向指定節點發送消息
  調用 DistroHttpAgent.syncData() 方法發送數據
  調用 NamingProxy.syncData() 方法發送數據
  
5:異常任務調用 handleFailedTask() 方法進行處理
  調用 DistroFailedTaskHandler 處理失敗任務
  調用 DistroHttpCombinedKeyTaskFailedHandler 將失敗任務重新投遞成延遲任務。
增量數據入口

我們回到服務註冊,服務註冊的InstanceController.register()就是數據入口,它會調用ServiceManager.registerInstance(),執行數據同步的時候,調用addInstance(),在該方法中會執行DistroConsistencyServiceImpl.put(),該方法是增量同步的入口,會調用distroProtocol.sync()方法,代碼如下:

/***
 * 數據保存
 * @param key   key of data, this key should be globally unique
 * @param value value of data
 * @throws NacosException
 */
@Override
public void put(String key, Record value) throws NacosException {
    //將數據存入到dataStore中
    onPut(key, value);
    //使用distroProtocol同步數據
    distroProtocol.sync(
        new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
        DataOperation.CHANGE,
        globalConfig.getTaskDispatchPeriod() / 2);
}

sync()方法會執行任務發佈,代碼如下:

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    //向除了自己外的所有節點廣播
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        //從distroTaskEngineHolder獲取延時執行引擎,並將distroDelayTask任務添加進來
        //執行延時任務發佈
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}
增量同步操作

延遲任務對象我們可以從DistroTaskEngineHolder構造函數中得知是DistroDelayTaskProcessor,代碼如下:

/***
 * 構造函數指定任務處理器
 * @param distroComponentHolder
 */
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
    DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
    //指定任務處理器defaultDelayTaskProcessor
    delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}

它延遲執行的時候會執行process方法,該方法正是執行數據同步的地方,它會執行DistroSyncChangeTask任務,代碼如下:

/***
 * 任務處理過程
 * @param task     task.
 * @return
 */
@Override
public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
        //將延遲任務變更成非同步任務,非同步任務對象是一個線程
        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
        //將任務添加到NacosExecuteTaskExecuteEngine中,並執行
        distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
        return true;
    }
    return false;
}

DistroSyncChangeTask實質上是任務的開始,它自身是一個線程,所以會執行它的run方法,而run方法這是數據同步操作,代碼如下:

/***
 * 執行數據同步
 */
@Override
public void run() {
    Loggers.DISTRO.info("[DISTRO-START] {}", toString());
    try {
        //獲取本地緩存數據
        String type = getDistroKey().getResourceType();
        DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
        distroData.setType(DataOperation.CHANGE);
        //向其他節點同步數據
        boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
        if (!result) {
            handleFailedTask();
        }
        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
    } catch (Exception e) {
        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
        handleFailedTask();
    }
}

數據同步會執行調用syncData,該方法其實就是通過Http協議將數據發送到其他節點實現數據同步,代碼如下:

/***
 * 向其他節點同步數據
 * @param data         data
 * @param targetServer target server
 * @return
 */
@Override
public boolean syncData(DistroData data, String targetServer) {
    if (!memberManager.hasMember(targetServer)) {
        return true;
    }
    //獲取數據位元組數組
    byte[] dataContent = data.getContent();
    //通過Http協議同步數據
    return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}

最後:一定要跟著講師所給的源碼自行走一遍!!!

本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 由於網上搜索 PowerJob MapReduce 都是設計原理,demo也展示個空殼子,沒有演示Map到Reduce結果怎麼傳遞,對於沒有MR開發經驗的人來說並沒有什麼幫助,所以這裡寫了一個有完整計算意義的demo供參考。 代碼功能: 實現一個sum累加。 任務輸入參數: batchSize=10 ...
  • 因為webman是常駐記憶體框架 當前進程初始化一次後就不會再初始化了 所以構造函數里傳遞request是不好用的。 這裡使用中間件來代替 瞭解中間件: 中間件一般用於攔截請求或者響應。例如執行控制器前統一驗證用戶身份,如用戶未登錄時跳轉到登錄頁面。例如響應中增加某個header頭。例如統計某個uri ...
  • 數組 java數組是一個容器,保存著一組值,當數組創建之後,數組的的長度就固定了。 1.數組的定義 1.聲明數組 int array=null; 聲明瞭數組之後,數組是空的,沒什麼實際意義 2.創建數組 ​ array=new[10]; 3.給元素中數組賦值 array[0]=0; 註:數組的下標是 ...
  • lab1 要求按照論文實現一個mapReduce 框架 lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 論文:https://zhuanlan.zhihu.com/p/122571315 在mrsequential.go文件中有個單機版 ...
  • 作為一項古老的智力游戲,千百年來迷宮都散髮著迷人的魅力。但是,手工設計迷宮費時又耗(腦)力,於是,我們有必要製作一個程式:迷宮生成器…… 好吧,我編不下去了。但是,從上面的文字中,我們可以看出,我們此次的主題是:用Python實現一個迷宮生成器。 首先展示一下效果圖: 我們先分析一下所需的庫: 既然 ...
  • ReentrantLock 1 這篇還是接著ReentrantLock的公平鎖,沒看過第0篇的可以先去看上一篇https://www.cnblogs.com/sunankang/p/16456342.html 這篇就以問題為導向,先提出問題,然後根據問題去看代碼 確保能喚醒排隊的線程? A,B兩線程 ...
  • 輸入與輸出 讀取輸入 要想通過控制台進行輸入,首先需要構造一個與"標準輸入流"System.in關聯的Scanner對象。 // 創建輸入流對象 Scanner in = new Scanner(System.in); 現在,就可以使用Scanner類的各種方法讀取輸入了。例如,nextLine方法 ...
  • string常用庫函數 string的庫函數非常多,若全部掌握是非常耗時間的,但是我們只需要掌握常用,重要的庫函數即可,不常用的只需瞭解下即可,需要時,上C++標準官方庫查找。 這裡列舉出本篇說明的函數 insert、erase、swap、c_str、find、rfind、substr、getlin ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...