緊接上文,我們分析了Nacos的客戶端代碼, 今天我們再來試一下服務端 ,至此就可以Nacos源碼就告一段落,歡迎大家品鑒。 nacos服務端 註冊中心服務端的主要功能包括,接收客戶端的服務註冊,服務發現,服務下線的功能,但是除了這些和客戶端的交互之外,服務端還要做一些更重要的事情,就是我們常常會在 ...
緊接上文,我們分析了Nacos的客戶端代碼,
今天我們再來試一下服務端 ,至此就可以Nacos源碼就告一段落,歡迎大家品鑒。
nacos服務端
註冊中心服務端的主要功能包括,接收客戶端的服務註冊,服務發現,服務下線的功能,但是除了這些和客戶端的交互之外,服務端還要做一些更重要的事情,就是我們常常會在分散式系統中聽到的AP和CP,作為一個集群,nacos即實現了AP也實現了CP,其中AP使用的自己實現的Distro協議,而CP是採用raft協議實現的,這個過程中牽涉到心跳、選主等操作。
我們來學習一下註冊中心服務端接收客戶端服務註冊的功能。
註冊處理
我們先來學習一下Nacos的工具類WebUtils
,該工具類在nacos-core
工程下,該工具類是用於處理請求參數轉化的,裡面提供了2個常被用到的方法required()
和optional()
:
required方法通過參數名key,解析HttpServletRequest請求中的參數,並轉碼為UTF-8編碼。
optional方法在required方法的基礎上增加了預設值,如果獲取不到,則返回預設值。
代碼如下:
/**
* required方法通過參數名key,解析HttpServletRequest請求中的參數,並轉碼為UTF-8編碼。
*/
public static String required(final HttpServletRequest req, final String key) {
String value = req.getParameter(key);
if (StringUtils.isEmpty(value)) {
throw new IllegalArgumentException("Param '" + key + "' is required.");
}
String encoding = req.getParameter("encoding");
return resolveValue(value, encoding);
}
/**
* optional方法在required方法的基礎上增加了預設值,如果獲取不到,則返回預設值。
*/
public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {
if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {
return defaultValue;
}
String value = req.getParameter(key);
if (StringUtils.isBlank(value)) {
return defaultValue;
}
String encoding = req.getParameter("encoding");
return resolveValue(value, encoding);
}
nacos 的 server 與 client
使用了http
協議來交互,那麼在server
端必定提供了http
介面的入口,並且在core
模塊看到其依賴了spring boot starter
,所以它的http介面由集成了Spring的web伺服器支持,簡單地說就是像我們平時寫的業務服務一樣,有controller層和service層。
以OpenAPI作為入口來學習,我們找到/nacos/v1/ns/instance
服務註冊介面,在nacos-naming
工程中我們可以看到InstanceController
正是我們要找的對象,如下圖:
處理服務註冊,我們直接找對應的POST方法即可,代碼如下:
/**
* Register new instance.
* 接收客戶端註冊信息
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//獲取namespaceid,該參數是可選參數
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//獲取服務名字
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校驗服務的名字,服務的名字格式為groupName@@serviceName
NamingUtils.checkServiceNameFormat(serviceName);
//創建實例
final Instance instance = parseInstance(request);
//註冊服務
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
如上圖,該方法主要用於接收客戶端註冊信息,並且會校驗參數是否存在問題,如果不存在問題就創建服務的實例,服務實例創建後將服務實例註冊到Nacos中,註冊的方法如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//判斷本地緩存中是否存在該命名空間,如果不存在就創建,之後判斷該命名空間下是否
//存在該服務,如果不存在就創建空的服務
//如果實例為空,則創建實例,並且會將創建的實例存入到serviceMap集合中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//從serviceMap集合中獲取創建的實例
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//服務註冊,這一步才會把服務的實例信息和服務綁定起來
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
註冊的方法中會先創建該實例對象,創建前先檢查本地緩存是否存在該實例對象,如果不存在就創建,最後註冊該服務,並且該服務會和實例信息捆綁到一起。
Distro協議介紹
Distro是阿裡巴巴的私有協議, 是一種分散式一致性演算法,目前流行的Nacos服務管理框架就採用了Distro協議。Distro 協議被定位為 臨時數據的一致性協議:該類型協議, 不需要把數據存儲到磁碟或者資料庫,因為臨時數據通常和伺服器保持一個session會話, 該會話只要存在,數據就不會丟失 。
Distro 協議保證寫必須永遠是成功的,即使可能會發生網路分區。當網路恢復時,把各數據分片的數據進行合併。
Distro 協議具有以下特點:
1:專門為了註冊中心而創造出的協議;
2:客戶端與服務端有兩個重要的交互,服務註冊與心跳發送;
3:客戶端以服務為維度向服務端註冊,註冊後每隔一段時間向服務端發送一次心跳,心跳包需要帶上註冊服務的全部信息,在客戶端看來,服務端節點對等,所以請求的節點是隨機的;
4:客戶端請求失敗則換一個節點重新發送請求;
5:服務端節點都存儲所有數據,但每個節點只負責其中一部分服務,在接收到客戶端的“寫”(註冊、心跳、下線等)請求後,服務端節點判斷請求的服務是否為自己負責,如果是,則處理,否則交由負責的節點處理;
6:每個服務端節點主動發送健康檢查到其他節點,響應的節點被該節點視為健康節點;
7:服務端在接收到客戶端的服務心跳後,如果該服務不存在,則將該心跳請求當做註冊請求來處理;
8:服務端如果長時間未收到客戶端心跳,則下線該服務;
9:負責的節點在接收到服務註冊、服務心跳等寫請求後將數據寫入後即返回,後臺非同步地將數據同步給其他節點;
10:節點在收到讀請求後直接從本機獲取後返回,無論數據是否為最新。
Distro定址
Distro協議主要用於nacos 服務端節點之間的相互發現,nacos使用定址機制來實現服務端節點的管理。在Nacos中,定址模式有三種:
單機模式(StandaloneMemberLookup)
文件模式(FileConfigMemberLookup)
伺服器模式(AddressServerMemberLookup)
三種定址模式如下圖:
1.2.3.1 單機模式
在com.alibaba.nacos.core.cluster.lookup.LookupFactory
中有創建定址方式,可以創建集群啟動方式、單機啟動方式,不同啟動方式就決定了不同定址模式,如果是集群啟動,
/**
* Create the target addressing pattern.
* 創建定址模式
* @param memberManager {@link ServerMemberManager}
* @return {@link MemberLookup}
* @throws NacosException NacosException
*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
//NacosServer 集群方式啟動
if (!EnvUtil.getStandaloneMode()) {
String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
//由參數中傳入的定址方式得到LookupType對象
LookupType type = chooseLookup(lookupType);
//選擇定址方式
LOOK_UP = find(type);
//設置當前定址方式
currentLookupType = type;
} else {
//NacosServer單機啟動
LOOK_UP = new StandaloneMemberLookup();
}
LOOK_UP.injectMemberManager(memberManager);
Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
return LOOK_UP;
}
/***
* 選擇定址方式
* @param type
* @return
*/
private static MemberLookup find(LookupType type) {
//文件定址模式,也就是配置cluster.conf配置文件將多個節點串聯起來,
// 通過配置文件尋找其他節點,以達到和其他節點通信的目的
if (LookupType.FILE_CONFIG.equals(type)) {
LOOK_UP = new FileConfigMemberLookup();
return LOOK_UP;
}
//伺服器模式
if (LookupType.ADDRESS_SERVER.equals(type)) {
LOOK_UP = new AddressServerMemberLookup();
return LOOK_UP;
}
// unpossible to run here
throw new IllegalArgumentException();
}
單節點定址模式會直接創建StandaloneMemberLookup
對象,而文件定址模式會創建FileConfigMemberLookup
對象,伺服器定址模式會創建AddressServerMemberLookup
;
1.2.3.2 文件定址模式
文件定址模式主要在創建集群的時候,通過cluster.conf
來配置集群,程式可以通過監聽cluster.conf
文件變化實現動態管理節點,FileConfigMemberLookup
源碼如下:
public class FileConfigMemberLookup extends AbstractMemberLookup {
//創建文件監聽器
private FileWatcher watcher = new FileWatcher() {
//文件發生變更事件
@Override
public void onChange(FileChangeEvent event) {
readClusterConfFromDisk();
}
//檢查context是否包含cluster.conf
@Override
public boolean interest(String context) {
return StringUtils.contains(context, "cluster.conf");
}
};
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
readClusterConfFromDisk();
// 使用inotify機制來監視文件更改,並自動觸發對cluster.conf的讀取
try {
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
} catch (Throwable e) {
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
}
}
}
@Override
public void destroy() throws NacosException {
WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);
}
private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = EnvUtil.readClusterConf();
tmpMembers = MemberUtil.readServerConf(tmp);
} catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
}
afterLookup(tmpMembers);
}
}
1.2.3.3 伺服器定址模式
使用地址伺服器存儲節點信息,會創建AddressServerMemberLookup
,服務端定時拉取信息進行管理;
public class AddressServerMemberLookup extends AbstractMemberLookup {
private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {
};
public String domainName;
public String addressPort;
public String addressUrl;
public String envIdUrl;
public String addressServerUrl;
private volatile boolean isAddressServerHealth = true;
private int addressServerFailCount = 0;
private int maxFailCount = 12;
private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);
private volatile boolean shutdown = false;
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
initAddressSys();
run();
}
}
/***
* 獲取伺服器地址
*/
private void initAddressSys() {
String envDomainName = System.getenv("address_server_domain");
if (StringUtils.isBlank(envDomainName)) {
domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");
} else {
domainName = envDomainName;
}
String envAddressPort = System.getenv("address_server_port");
if (StringUtils.isBlank(envAddressPort)) {
addressPort = EnvUtil.getProperty("address.server.port", "8080");
} else {
addressPort = envAddressPort;
}
String envAddressUrl = System.getenv("address_server_url");
if (StringUtils.isBlank(envAddressUrl)) {
addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");
} else {
addressUrl = envAddressUrl;
}
addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;
envIdUrl = "http://" + domainName + ":" + addressPort + "/env";
Loggers.CORE.info("ServerListService address-server port:" + addressPort);
Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);
}
@SuppressWarnings("PMD.UndefineMagicConstantRule")
private void run() throws NacosException {
// With the address server, you need to perform a synchronous member node pull at startup
// Repeat three times, successfully jump out
boolean success = false;
Throwable ex = null;
int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
for (int i = 0; i < maxRetry; i++) {
try {
//拉取集群節點信息
syncFromAddressUrl();
success = true;
break;
} catch (Throwable e) {
ex = e;
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
}
}
if (!success) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
//創建定時任務
GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
}
@Override
public void destroy() throws NacosException {
shutdown = true;
}
@Override
public Map<String, Object> info() {
Map<String, Object> info = new HashMap<>(4);
info.put("addressServerHealth", isAddressServerHealth);
info.put("addressServerUrl", addressServerUrl);
info.put("envIdUrl", envIdUrl);
info.put("addressServerFailCount", addressServerFailCount);
return info;
}
private void syncFromAddressUrl() throws Exception {
RestResult<String> result = restTemplate
.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
if (result.ok()) {
isAddressServerHealth = true;
Reader reader = new StringReader(result.getData());
try {
afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
} catch (Throwable e) {
Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
ExceptionUtil.getAllExceptionMsg(e));
}
addressServerFailCount = 0;
} else {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
}
}
// 定時任務
class AddressServerSyncTask implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
}
try {
//拉取服務列表
syncFromAddressUrl();
} catch (Throwable ex) {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
} finally {
GlobalExecutor.scheduleByCommon(this, 5_000L);
}
}
}
}
數據同步
Nacos數據同步分為全量同步和增量同步,所謂全量同步就是初始化數據一次性同步,而增量同步是指有數據增加的時候,只同步增加的數據。
全量同步
全量同步流程比較複雜,流程如上圖:
1:啟動一個定時任務線程DistroLoadDataTask載入數據,調用load()方法載入數據
2:調用loadAllDataSnapshotFromRemote()方法從遠程機器同步所有的數據
3:從namingProxy代理獲取所有的數據data
4:構造http請求,調用httpGet方法從指定的server獲取數據
5:從獲取的結果result中獲取數據bytes
6:處理數據processData
7:從data反序列化出datumMap
8:把數據存儲到dataStore,也就是本地緩存dataMap
9:監聽器不包括key,就創建一個空的service,並且綁定監聽器
10:監聽器listener執行成功後,就更新data store
任務啟動
在com.alibaba.nacos.core.distributed.distro.DistroProtocol
的構造函數中調用startDistroTask()
方法,該方法會執行startVerifyTask()
和startLoadTask()
,我們重點關註startLoadTask()
,該方法代碼如下:
/***
* 啟動DistroTask
*/
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
//啟動startVerifyTask,做數據同步校驗
startVerifyTask();
//啟動DistroLoadDataTask,批量載入數據
startLoadTask();
}
//啟動DistroLoadDataTask
private void startLoadTask() {
//處理狀態回調對象
DistroCallback loadCallback = new DistroCallback() {
//處理成功
@Override
public void onSuccess() {
isInitialized = true;
}
//處理失敗
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
//執行DistroLoadDataTask,是一個多線程
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
/***
* 啟動startVerifyTask
* 數據校驗
*/
private void startVerifyTask() {
GlobalExecutor.schedulePartitionDataTimedSync(
new DistroVerifyTask(
memberManager,
distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
}
數據如何執行載入
上面方法會調用DistroLoadDataTask
對象,而該對象其實是個線程,因此會執行它的run方法,run方法會調用load()方法實現數據全量載入,代碼如下:
/***
* 數據載入過程
*/
@Override
public void run() {
try {
//載入數據
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
/***
* 載入數據,並同步
* @throws Exception
*/
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
//同步數據
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
//從遠程機器上同步所有數據
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
數據同步
數據同步會通過Http請求從遠程伺服器獲取數據,並同步到當前服務的緩存中,執行流程如下:
1:loadAllDataSnapshotFromRemote()從遠程載入所有數據,並處理同步到本機
2:transportAgent.getDatumSnapshot()遠程載入數據,通過Http請求執行遠程載入
3:dataProcessor.processSnapshot()處理數據同步到本地
數據處理完整邏輯代碼如下:loadAllDataSnapshotFromRemote()
方法
/***
* 從遠程機器上同步所有數據
*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
//遍歷集群成員節點,不包括自己
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
//從遠程節點載入數據,調用http請求介面: distro/datums;
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
//處理數據
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
遠程載入數據代碼如下:transportAgent.getDatumSnapshot()
方法
/***
* 從namingProxy代理獲取所有的數據data,從獲取的結果result中獲取數據bytes;
* @param targetServer target server.
* @return
*/
@Override
public DistroData getDatumSnapshot(String targetServer) {
try {
//從namingProxy代理獲取所有的數據data,從獲取的結果result中獲取數據bytes;
byte[] allDatum = NamingProxy.getAllData(targetServer);
//將數據封裝成DistroData
return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
} catch (Exception e) {
throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
}
}
/**
* Get all datum from target server.
* NamingProxy.getAllData
* 執行HttpGet請求,並獲取返回數據
* @param server target server address
* @return all datum byte array
* @throws Exception exception
*/
public static byte[] getAllData(String server) throws Exception {
//參數封裝
Map<String, String> params = new HashMap<>(8);
//組裝URL,並執行HttpGet請求,獲取結果集
RestResult<String> result = HttpClient.httpGet(
"http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,
new ArrayList<>(), params);
//返回數據
if (result.ok()) {
return result.getData().getBytes();
}
throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "
+ result.getMessage());
}
處理數據同步到本地代碼如下:dataProcessor.processSnapshot()
/**
* 數據處理並更新本地緩存
* @param data
* @return
* @throws Exception
*/
private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
//從data反序列化出datumMap
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
// 把數據存儲到dataStore,也就是本地緩存dataMap
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
//監聽器不包括key,就創建一個空的service,並且綁定監聽器
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
//創建一個空的service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
// 與鍵值對應的監聽器不能為空,這裡的監聽器類型是 ServiceManager
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
//為空的綁定監聽器
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
//迴圈所有datumMap
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
//執行監聽器的onChange監聽方法
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
// 監聽器listener執行成功後,就更新dataStore
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
到此實現數據全量同步,其實全量同步最終封裝的協議還是Http。
增量同步
新增數據使用非同步廣播同步:
1:DistroProtocol 使用 sync() 方法接收增量數據
2:向其他節點發佈廣播任務
調用 distroTaskEngineHolder 發佈延遲任務
3:調用 DistroDelayTaskProcessor.process() 方法進行任務投遞:將延遲任務轉換為非同步變更任務
4:執行變更任務 DistroSyncChangeTask.run() 方法:向指定節點發送消息
調用 DistroHttpAgent.syncData() 方法發送數據
調用 NamingProxy.syncData() 方法發送數據
5:異常任務調用 handleFailedTask() 方法進行處理
調用 DistroFailedTaskHandler 處理失敗任務
調用 DistroHttpCombinedKeyTaskFailedHandler 將失敗任務重新投遞成延遲任務。
增量數據入口
我們回到服務註冊,服務註冊的InstanceController.register()
就是數據入口,它會調用ServiceManager.registerInstance()
,執行數據同步的時候,調用addInstance()
,在該方法中會執行DistroConsistencyServiceImpl.put()
,該方法是增量同步的入口,會調用distroProtocol.sync()
方法,代碼如下:
/***
* 數據保存
* @param key key of data, this key should be globally unique
* @param value value of data
* @throws NacosException
*/
@Override
public void put(String key, Record value) throws NacosException {
//將數據存入到dataStore中
onPut(key, value);
//使用distroProtocol同步數據
distroProtocol.sync(
new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
sync()
方法會執行任務發佈,代碼如下:
public void sync(DistroKey distroKey, DataOperation action, long delay) {
//向除了自己外的所有節點廣播
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//從distroTaskEngineHolder獲取延時執行引擎,並將distroDelayTask任務添加進來
//執行延時任務發佈
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
增量同步操作
延遲任務對象我們可以從DistroTaskEngineHolder
構造函數中得知是DistroDelayTaskProcessor
,代碼如下:
/***
* 構造函數指定任務處理器
* @param distroComponentHolder
*/
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
//指定任務處理器defaultDelayTaskProcessor
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
它延遲執行的時候會執行process
方法,該方法正是執行數據同步的地方,它會執行DistroSyncChangeTask任務,代碼如下:
/***
* 任務處理過程
* @param task task.
* @return
*/
@Override
public boolean process(NacosTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
//將延遲任務變更成非同步任務,非同步任務對象是一個線程
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
//將任務添加到NacosExecuteTaskExecuteEngine中,並執行
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
DistroSyncChangeTask
實質上是任務的開始,它自身是一個線程,所以會執行它的run方法,而run方法這是數據同步操作,代碼如下:
/***
* 執行數據同步
*/
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
//獲取本地緩存數據
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
//向其他節點同步數據
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
數據同步會執行調用syncData
,該方法其實就是通過Http協議將數據發送到其他節點實現數據同步,代碼如下:
/***
* 向其他節點同步數據
* @param data data
* @param targetServer target server
* @return
*/
@Override
public boolean syncData(DistroData data, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true;
}
//獲取數據位元組數組
byte[] dataContent = data.getContent();
//通過Http協議同步數據
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
最後:一定要跟著講師所給的源碼自行走一遍!!!
本文由傳智教育博學谷 - 狂野架構師教研團隊發佈
如果本文對您有幫助,歡迎關註和點贊;如果您有任何建議也可留言評論或私信,您的支持是我堅持創作的動力
轉載請註明出處!