Nacos 動態配置原理 可憐夜半虛前席,不問蒼生問鬼神。 簡介 動態配置管理是 Nacos 的三大功能之一,通過動態配置服務,我們可以在所有環境中以集中和動態的方式管理所有應用程式或服務的配置信息。 動態配置中心可以實現配置更新時無需重新部署應用程式和服務即可使相應的配置信息生效,這極大了增加了系 ...
Nacos 動態配置原理
可憐夜半虛前席,不問蒼生問鬼神。
簡介
動態配置管理是 Nacos 的三大功能之一,通過動態配置服務,我們可以在所有環境中以集中和動態的方式管理所有應用程式或服務的配置信息。
動態配置中心可以實現配置更新時無需重新部署應用程式和服務即可使相應的配置信息生效,這極大了增加了系統的運維能力。
從Nacos 2.1.1 源碼中簡單瞭解其動態配置原理。
動態配置
下麵通過一個簡單的例子來瞭解下 Nacos 的動態配置的功能,看看 Nacos 是如何以簡單、優雅、高效的方式管理配置,實現配置的動態變更的。
環境準備
源碼獲取
首先我們要準備一個 Nacos 的服務端,這裡通過 Git 命令下載代碼資源包的方式獲取 Nacos 的服務端。
git clone https://github.com/alibaba/nacos.git
Git 命令下載Nacos服務端源碼
項目構建
把通過 Git 命令下載的源碼包導入 IDEA 中構建Nacos服務端項目,導入後 IDEA 後可以看到在項目目錄下有一個BUILDING文件,裡面有構建命令。
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U
項目構建指引
執行構建成功之後將會在控制台看到BUILD SUCCESS 相關INFO 列印。
構建成功
然後在項目的 distribution 模塊的 target 目錄下我們就可以找到可執行程式和兩個壓縮包,這兩個壓縮包就是nacos 的 github 官網上發佈的 Release 包。
- nacos-server-2.1.1.tar.gz
- nacos-server-2.1.1.zip
以及nacos 的可執行程式,即Windows 和 Linux 下的開啟和關閉命令。
- startup.sh
- shutdown.sh
- startup.cmd
- shutdown.cmd
- 接下來我們把編譯好的兩個壓縮包拷貝出來,然後解壓出來直接使用,這樣就相當於我們在官網上下載了 Release 包了。
- 解壓後文件結構和 nacos-server-2.1.1 一樣,我們直接執行 startup.sh 即可啟動一個單機的 Nacos 服務端了。
- 前面這些環境準備的步驟,如果不需要修改nacos源碼,完全可以直接在網上下載Nacos 的Release 包,解壓後即可啟動運行nacos。
啟動服務端
當前安裝的nacos版本:Nacos 2.1.1。
啟動命令
解壓後CMD到bin 目錄下執行啟動命令來啟動一個 Nacos 服務端,Window系統直接雙擊 startup.cmd 即可。
可執行文件-Windows
啟動報錯
org.springframework.context.ApplicationContextException: Unable to start web server; nested exception is org.springframework.boot.web.server.WebServerException: Unable to start embedded Tomcat
startup.cmd 啟動報錯
修複啟動報錯
nacos 預設的啟動方式是集群啟動,單機使用集群啟動配置就會導致啟動報錯。
set MODE="cluster"
編輯 startup.cmd 可執行文件,修改啟動模式
set MODE="standalone"
編輯 startup.cmd 可執行文件
如果是非Windows 環境下運行就不會有這個問題,可以直接指定啟動方式。
sh startup.sh -m standalone
啟動完將會看到 INFO Nacos started successfully 相關列印。
startup.cmd 啟動成功
登錄
啟動成功後,我們就可以在瀏覽器訪問 Nacos 的控制台了,訪問地址:http://localhost:8848/nacos/index.html。
Nacos首頁訪問
新版的nacos在首頁登錄界面加上了這個亮眼的標題:內部系統,不可暴露到公網,看代碼提交記錄是2021年2月份加的。
下載了Nacos源碼這些樣式我們也都可以自己的需求修改為自己想要的效果。
通過查看登錄介面,訪問地址:http://localhost:8848/nacos/v1/auth/users/login。
nacos登錄介面
- nacos登錄介面的許可權控制有一個預設的賬號和密碼都是 nacos,也可以改為ldap。
- 就看application.properties 中的配置nacos.core.auth.system.type=nacos 當前登錄用戶了。
- 預設是的賬號和密碼都是:nacos/nacos。
登錄進去之後,可以看到空白配置列表和nacos預設賬戶信息。
啟動客戶端
創建 ConfigService連接
當服務端以及配置項都準備好之後,就可以創建客戶端了,如下圖所示新建一個 Nacos 的 ConfigService 來接收數據。
新建配置
接下來我們在控制臺上創建一個簡單的配置項,如下圖所示。
配置發佈後,可以在客戶端後臺看到列印如下信息:
修改配置信息
接下來我們在 Nacos 的控制臺上將我們的配置信息改為如下圖所示:
修改完配置,點擊 “發佈” 按鈕後,客戶端將會收到最新的數據,如下圖所示:
到此為止,一個簡單的動態配置管理功能已經走完一遍了。
動態配置源碼分析
從我們的 demo 中可以知道,我們首先是創建了一個 ConfigService。而 ConfigService 是通過 ConfigFactory 類創建的,如下圖所示:
上面是通過main 方法創建測試的客戶端,實際上同步配置初始化流程是由NacosConfigManager 管理。
在 NacosConfigAutoConfiguration 配置類中:
1 @Bean 2 public NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) { 3 return new NacosConfigManager(nacosConfigProperties); 4 }View Code
NacosConfigManager 持有:ConfigService(配置相關操作)、NacosConfigProperties(Spring Boot 對配置中心的配置)。
1 public class NacosConfigManager { 2 private static ConfigService service = null; 3 private NacosConfigProperties nacosConfigProperties; 4 5 public NacosConfigManager(NacosConfigProperties nacosConfigProperties) { 6 this.nacosConfigProperties = nacosConfigProperties; 7 createConfigService(nacosConfigProperties); 8 } 9 10 static ConfigService createConfigService( 11 NacosConfigProperties nacosConfigProperties) { 12 if (Objects.isNull(service)) { 13 // 雙重加鎖 防止創建了多個 NacosConfigManager 14 synchronized (NacosConfigManager.class) { 15 try { 16 if (Objects.isNull(service)) { 17 // 通過反射構造函數創建了 NacosService 的子類 18 // NacosConfigService(Properties properties) 19 service = NacosFactory.createConfigService( 20 nacosConfigProperties.assembleConfigServiceProperties()); 21 } 22 } 23 // ………… 24 } 25 } 26 return service; 27 } 28 // ………… 29 }View Code
實例化 ConfigService
1 public NacosConfigService(Properties properties) throws NacosException { 2 ValidatorUtils.checkInitParam(properties); 3 // 初始化 命名空間,放到 properties 中 4 initNamespace(properties); 5 // 設置請求過濾器 6 this.configFilterChainManager = new ConfigFilterChainManager(properties); 7 // 設置伺服器名稱列表的線程任務 8 ServerListManager serverListManager = new ServerListManager(properties); 9 serverListManager.start(); 10 // 實例化主要初始化對象1: ClientWorker(MVP選手) 11 this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties); 12 // 實例化主要初始化對象2: ServerHttpAgent 13 // will be deleted in 2.0 later versions 14 agent = new ServerHttpAgent(serverListManager); 15 16 }View Code
ClientWorker 構造函數
1 @SuppressWarnings("PMD.ThreadPoolCreationRule") 2 public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, 3 final Properties properties) throws NacosException { 4 // 設置請求過濾器 5 this.configFilterChainManager = configFilterChainManager; 6 // 初始化超時配置參數 7 init(properties); 8 // 創建 Grpc 請求類 9 agent = new ConfigRpcTransportClient(properties, serverListManager); 10 // 核心線程數 count == 1 11 int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE); 12 /** 13 * 創建具有定時執行功能的單線程池,用於定時執行 checkConfigInfo 方法 14 * 即該線程任務用於同步配置 15 */ 16 ScheduledExecutorService executorService = Executors 17 .newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> { 18 Thread t = new Thread(r); 19 // 設置線程名稱 20 t.setName("com.alibaba.nacos.client.Worker"); 21 // 設置為守護線程,在主線程關閉後無需手動關閉守護線程,該線程會自動關閉 22 t.setDaemon(true); 23 return t; 24 }); 25 agent.setExecutor(executorService); 26 // 啟動線程 處於就緒狀態,主要處理 startInternal 方法 27 agent.start(); 28 29 }View Code
ConfigRpcTransportClient
agent.start() 的 startInternal()
ConfigRpcTransportClient 的父類為 ConfigTransportClient。
1 @Override 2 public void startInternal() { 3 executor.schedule(() -> { 4 /** 5 * 啟動線程任務,通過 while(true) 方式一直迴圈。 6 */ 7 while (!executor.isShutdown() && !executor.isTerminated()) { 8 try { 9 /** 10 * 獲取隊列頭部元素,如果獲取不到則等待5s,Nacos 通過這種方式來控制迴圈間隔 11 * Nacos 還可以通過調用 notifyListenConfig() 向 listenExecutebell 設置元素的方式,來立即執行 executeConfigListen() 方法 12 */ 13 listenExecutebell.poll(5L, TimeUnit.SECONDS); 14 if (executor.isShutdown() || executor.isTerminated()) { 15 continue; 16 } 17 executeConfigListen(); 18 } catch (Exception e) { 19 LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); 20 } 21 } 22 }, 0L, TimeUnit.MILLISECONDS); 23 24 }View Code
到此處同步配置的初始化流程就完成了,我們繼續看同步配置的過程。
客戶端同步配置
同步配置的邏輯主要在 executeConfigListen()
方法中,這段方法比較長,需要耐心的分開來看。
1 @Override 2 public void executeConfigListen() { 3 // 有監聽組 4 Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16); 5 // 無監聽組 6 Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16); 7 // 系統當前時間 8 long now = System.currentTimeMillis(); 9 /** 10 * 判斷是否到全量同步時間 11 * 分鐘執行一次全量同步。 5 minutes to check all listen cache keys ,ALL_SYNC_INTERNAL == 5 * 60 * 1000L 12 * 當前時間 - 上次同步時間 是否大於等於 五分鐘 13 */ 14 boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL; 15 // 遍歷本地 CacheData Map, CacheData 保存了 Nacos 配置基本信息,配置的監聽器等基礎信息。 16 for (CacheData cache : cacheMap.get().values()) { 17 synchronized (cache) { 18 //check local listeners consistent. 19 /** 20 * 首先判斷,該 cacheData 是否需要檢查。也就是如果為 isSyncWithServer == false,必定進行檢查。 isSyncWithServer 預設為 false 21 * 1.添加listener.default為false;需要檢查。 22 * 2.接收配置更改通知,設置為false;需要檢查。 23 * 3.last listener被移除,設置為false;需要檢查 24 */ 25 if (cache.isSyncWithServer()) { 26 /** 27 * 執行 CacheData.Md5 與 Listener.md5的比對與設定 28 * 即本地檢查 checkListenerMd5 如果不相同-配置有變化,則進行監聽器的回調。 29 * 跟蹤 LocalConfigInfoProcessor 方法可以查看Nacos 將配置信息保存在哪裡 30 * nacos 配置保存路徑:System.getProperty("JM.LOG.PATH", System.getProperty("user.home")) + File.separator + "nacos" + File.separator + "config"; 31 * C:\Users\01421603\nacos\config\fixed-localhost_8848_nacos\snapshot\DEFAULT_GROUP 32 */ 33 cache.checkListenerMd5(); 34 if (!needAllSync) { 35 // 是否需要全量同步,如果未達到全量同步時間即距上次全量同步小於五分鐘,則跳過這個 cacheData,即本次迴圈的nacos配置無需更換 36 continue; 37 } 38 } 39 // 本地nacos配置信息 監聽器不為空 走這 40 if (!CollectionUtils.isEmpty(cache.getListeners())) { 41 //get listen config ,是否啟用本地監聽配置 isUseLocalConfig 預設 == false 42 if (!cache.isUseLocalConfigInfo()) { 43 // 有監聽器的放入 listenCachesMap 44 List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); 45 if (cacheDatas == null) { 46 cacheDatas = new LinkedList<>(); 47 listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); 48 } 49 cacheDatas.add(cache); 50 51 } 52 // 本地nacos配置信息 監聽器為空 走這 53 } else if (CollectionUtils.isEmpty(cache.getListeners())) { 54 if (!cache.isUseLocalConfigInfo()) { 55 // 沒有監聽器的放入 removeListenCachesMap 56 List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); 57 if (cacheDatas == null) { 58 cacheDatas = new LinkedList<>(); 59 removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); 60 } 61 cacheDatas.add(cache); 62 63 } 64 } 65 } 66 67 } 68 // 標誌是否有更改的配置,預設為 false 69 boolean hasChangedKeys = false; 70 // 有監聽組配置信息 非空 71 if (!listenCachesMap.isEmpty()) { 72 for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) { 73 String taskId = entry.getKey(); 74 Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2); 75 76 List<CacheData> listenCaches = entry.getValue(); 77 for (CacheData cacheData : listenCaches) { 78 timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant), 79 cacheData.getLastModifiedTs().longValue()); 80 } 81 // 構建監聽器請求 82 ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); 83 configChangeListenRequest.setListen(true); 84 try { 85 // 初始化 RpcClient 客戶端 86 RpcClient rpcClient = ensureRpcClient(taskId); 87 /** 88 * 發送請求向 Nacos Server 添加配置變化監聽器 89 * ConfigChangeBatchListenResponse 服務端將返回有變化的 dataId、group、tenant 90 */ 91 ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy( 92 rpcClient, configChangeListenRequest); 93 if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) { 94 95 Set<String> changeKeys = new HashSet<>(); 96 //handle changed keys,notify listener 97 // 處理有變化的配置 98 if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { 99 hasChangedKeys = true; 100 for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse 101 .getChangedConfigs()) { 102 String changeKey = GroupKey 103 .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), 104 changeConfig.getTenant()); 105 changeKeys.add(changeKey); 106 boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); 107 /** 108 * 刷新上下文 109 * 此處將請求 Nacos Server ,獲取最新配置內容,並觸發 Listener 的回調。 110 */ 111 refreshContentAndCheck(changeKey, !isInitializing); 112 } 113 114 } 115 116 //handler content configs 117 for (CacheData cacheData : listenCaches) { 118 String groupKey = GroupKey 119 .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()); 120 // 如果返回的 changeKeys 中,未包含此 groupKey。則說明此內容未發生變化。 121 if (!changeKeys.contains(groupKey)) { 122 //sync:cache data md5 = server md5 && cache data md5 = all listeners md5. 123 synchronized (cacheData) { 124 if (!cacheData.getListeners().isEmpty()) { 125 126 Long previousTimesStamp = timestampMap.get(groupKey); 127 if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, 128 System.currentTimeMillis())) { 129 continue; 130 } 131 // 則將同步標誌設為 true 132 cacheData.setSyncWithServer(true); 133 } 134 } 135 } 136 // 將初始化狀態設置 false 137 cacheData.setInitializing(false); 138 } 139 140 } 141 } catch (Exception e) { 142 143 LOGGER.error("Async listen config change error ", e); 144 try { 145 Thread.sleep(50L); 146 } catch (InterruptedException interruptedException) { 147 //ignore 148 } 149 } 150 } 151 } 152 153 /** 154 * 處理無監聽器的 CacheData 155 * 無監聽器的 CacheData 就是,從 Nacos Client 與 Nacos Server 中移除掉原有的監聽器。 156 */ 157 if (!removeListenCachesMap.isEmpty()) { 158 for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) { 159 String taskId = entry.getKey(); 160 List<CacheData> removeListenCaches = entry.getValue(); 161 ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches); 162 configChangeListenRequest.setListen(false); 163 try { 164 RpcClient rpcClient = ensureRpcClient(taskId); 165 boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); 166 if (removeSuccess) { 167 for (CacheData cacheData : removeListenCaches) { 168 synchronized (cacheData) { 169 if (cacheData.getListeners().isEmpty()) { 170 ClientWorker.this 171 .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); 172 } 173 } 174 } 175 } 176 177 } catch (Exception e) { 178 LOGGER.error("async remove listen config change error ", e); 179 } 180 try { 181 Thread.sleep(50L); 182 } catch (InterruptedException interruptedException) { 183 //ignore 184 } 185 } 186 } 187 188 if (needAllSync) { 189 lastAllSyncTime = now; 190 } 191