Nacos配置中心集群原理及源碼分析

来源:https://www.cnblogs.com/mic112/archive/2022/03/29/16071963.html
-Advertisement-
Play Games

Nacos作為配置中心,必然需要保證服務節點的高可用性,那麼Nacos是如何實現集群的呢? 下麵這個圖,表示Nacos集群的部署圖。 Nacos集群工作原理 Nacos作為配置中心的集群結構中,是一種無中心化節點的設計,由於沒有主從節點,也沒有選舉機制,所以為了能夠實現熱備,就需要增加虛擬IP(VI ...


Nacos作為配置中心,必然需要保證服務節點的高可用性,那麼Nacos是如何實現集群的呢?

下麵這個圖,表示Nacos集群的部署圖。

image-20211130193537901

Nacos集群工作原理

Nacos作為配置中心的集群結構中,是一種無中心化節點的設計,由於沒有主從節點,也沒有選舉機制,所以為了能夠實現熱備,就需要增加虛擬IP(VIP)。

Nacos的數據存儲分為兩部分

  1. Mysql資料庫存儲,所有Nacos節點共用同一份數據,數據的副本機制由Mysql本身的主從方案來解決,從而保證數據的可靠性。
  2. 每個節點的本地磁碟,會保存一份全量數據,具體路徑:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的設計中,Mysql是一個中心數據倉庫,且認為在Mysql中的數據是絕對正確的。 除此之外,Nacos在啟動時會把Mysql中的數據寫一份到本地磁碟。

這麼設計的好處是可以提高性能,當客戶端需要請求某個配置項時,服務端會想Ian從磁碟中讀取對應文件返回,而磁碟的讀取效率要比資料庫效率高。

當配置發生變更時:

  1. Nacos會把變更的配置保存到資料庫,然後再寫入本地文件。
  2. 接著發送一個HTTP請求,給到集群中的其他節點,其他節點收到事件後,從Mysql中dump剛剛寫入的數據到本地文件中。

另外,NacosServer啟動後,會同步啟動一個定時任務,每隔6小時,會dump一次全量數據到本地文件

配置變更同步入口

當配置發生修改、刪除、新增操作時,通過發佈一個notifyConfigChange事件。

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
    
   //省略..
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    }//省略
    return true;
}

AsyncNotifyService

配置數據變更事件,專門有一個監聽器AsyncNotifyService,它會處理數據變更後的同步事件。

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
                
                // 構建NotifySingleTask,並添加到隊列中。
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) { //遍歷集群中的每個節點
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                //非同步執行任務 AsyncTask
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

AsyncTask

@Override
public void run() {
    executeAsyncInvoke();
}

private void executeAsyncInvoke() {
    while (!queue.isEmpty()) {//遍歷隊列中的數據,直到數據為空
        NotifySingleTask task = queue.poll(); //獲取task
        String targetIp = task.getTargetIP(); //獲取目標ip
        
        if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目標ip
            // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
            //判斷目標ip的健康狀態
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
            if (unHealthNeedDelay) { //如果目標服務是非健康,則繼續添加到隊列中,延後再執行。
                // target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                        task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                        0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                //構建header
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                //通過restTemplate發起遠程調用,如果調用成功,則執行AsyncNotifyCallBack的回調方法
                restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
            }
        }
    }
}

目標節點接收請求

數據同步的請求地址為,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        //
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

dumpService.dump用來實現配置的更新,代碼如下

當前任務會被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

TaskManager.addTask, 先調用父類去完成任務添加。

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

在這種場景設計中,一般都會採用生產者消費者模式來完成,因此這裡不難猜測到,任務會被保存到一個隊列中,然後有另外一個線程來執行。

NacosDelayTaskExecuteEngine

TaskManager的父類是NacosDelayTaskExecuteEngine,

這個類中有一個成員屬性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,專門來保存延期執行的任務類型AbstractDelayTask.

在這個類的構造方法中,初始化了一個延期執行的任務,其中具體的任務是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

ProcessRunnable

private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}

processTasks

protected void processTasks() {
    //獲取所有的任務
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        //獲取任務處理器,這裡返回的是DumpProcessor
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            //執行具體任務
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

DumpProcessor.process

讀取資料庫的最新數據,然後更新本地緩存和磁碟。

版權聲明:本博客所有文章除特別聲明外,均採用 CC BY-NC-SA 4.0 許可協議。轉載請註明來自 Mic帶你學架構
如果本篇文章對您有幫助,還請幫忙點個關註和贊,您的堅持是我不斷創作的動力。歡迎關註「跟著Mic學架構」公眾號公眾號獲取更多技術乾貨!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 練習: 存儲學生對象並遍歷,創建TreeSet集合使用無參構造方法,並按照年齡從小到大的順序排序,若年齡相同再按照姓名的字母順序排序 分析: 1.創建學生類,成員變數name,age;無參構造,帶參構造;get\set方法; 2.創建測試類,添加數據併進行排序;直接排序會報錯 3.需要Student ...
  • 過濾器就是過濾條件,對已經定位到數組中的 DOM 對象進行過濾篩選,過濾條件不能獨立出現在 jquery 函數,如果使用只能出現在選擇器後方 ...
  • 背景 很多時候,我們項目在開發環境和生成環境的環境配置是不一樣的,例如,資料庫配置,在開發的時候,我們一般用測試資料庫,而在生產環境的時候,我們是用正式的數據,這時候,我們可以利用profile在不同的環境下配置用不同的配置文件或者不同的配置。 解決方案 spring boot允許你通過命名約定按照 ...
  • 程式的異常:Throwable * 嚴重問題:Error 我們不處理。這種問題一般都是很嚴重的,比如說記憶體溢出。 * 問題:Exception * A:編譯期問題:不是RuntimeException的異常 必須進行處理的,因為你不處理,編譯就不能通過。 * B:運行期問題:RuntimeExcep ...
  • 一、前言 掃描件一直受大眾青睞,任何紙質資料在掃描之後進行存檔,想使用時手機就能打開,省心省力。但是掃描件的優點也恰恰造成了它的一個缺點,因為是通過電子設備掃描,所以出來的是圖像,如果想要處理文件上的內容,直接操作是無法實現的。 那要是想要引用其中的內容怎麼辦呢?別擔心,Python幫你解決問題。 ...
  • 一、序言 Spring Cache是Spring體系下標準化緩存框架。Spring Cache有如下優勢: 緩存品種多 支持緩存品種多,常見緩存Redis、EhCache、Caffeine均支持。它們之間既能獨立使用,也能組合使用。 平滑遷移 Spring內部支持的緩存,可實現無縫平滑遷移,無需修改 ...
  • 首先 相信有很多小伙伴都喜歡玩抖音吧,最近抖音張同學突然火了,兩個月漲粉一千多萬。看了他的視頻,滿滿的生活氣息,讓人有一種家的感覺。這就讓我很感興趣了,必須得用Python對他分析一下。 今天這篇文章,我抓取了張同學的視頻的評論數據,想從文本分析的角度,挖掘一下大家對張同學感興趣的點。 張同學 10 ...
  • 內容概要 web 開發模式 API 介面 postman 測試軟體的使用 restful 規範 drf 的安裝與使用 cbv 的 View 源碼分析 APIView 源碼分析 drf 的 Request 類 drf 的 APIView 類執行過程 內容詳細 web 開發模式 1、前後端不分離 在開發 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...