Eureka應用註冊與集群數據同步源碼解析

来源:https://www.cnblogs.com/zhixiang-org-cn/archive/2019/10/21/11716916.html
-Advertisement-
Play Games

在之前的 "EurekaClient自動裝配及啟動流程解析" 一文中我們提到過,在構造 類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程 客戶端發起註冊 這個方法中包含的 和`instanceInfo`兩個對象在之前的文章中都已經單獨拿出來了: "Eureka中重要的對象" 服務端接受註冊 ...


在之前的EurekaClient自動裝配及啟動流程解析一文中我們提到過,在構造DiscoveryClient類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程

客戶端發起註冊
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

這個方法中包含的registrationClientinstanceInfo兩個對象在之前的文章中都已經單獨拿出來了:Eureka中重要的對象

服務端接受註冊

服務端接受註冊的Controller在ApplicationResource類中,這裡需要註意的是普通客戶端註冊時其中參數isReplication為false,這個參數就是控制集群是否同步的表示

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // 一系列的參數校驗
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // AWS的一些東西,不用細看
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
    //註冊
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

接著往下看註冊的處理邏輯

 public void register(final InstanceInfo info, final boolean isReplication) {
   // 租約過期時間
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
   // 註冊
        super.register(info, leaseDuration, isReplication);
   //集群複製
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

Lease租約這個類之前也分析過了,不再展開了

服務端保存註冊信息

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
          //獲取鎖
            read.lock();
          //獲取該實例的註冊信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
              //如果不存在則添加  
              gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 當存在這個應用的註冊信息時
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                //使用註冊時間長的一方的應用信息
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {

                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
          //創建租約
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
          //如果存在租約則更新開始時間
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
              //添加到最近註冊隊列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

           // 獲得應用實例最終狀態
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
  1. 實例信息是由一個map對象保存的,在這個map中,key是應用的appName,value是另外一個map,在這個map中key是應用的id,而value則是應用的租約信息,map對象如下:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 
registry        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. 大體流程為,先查看是否存在該appName的註冊信息,如不存在則創建。
  2. 接著查看是否存在該id的註冊信息,如果存在判斷客戶端的最後修改時間,記錄最後修改的實例,如果不存在則設置自我保護模式的幾個參數
  3. 根據實例創建對應的租約信息,然後添加到map對象中
  4. 添加到最近註冊隊列
  5. 添加到應用實例覆蓋狀態映射
  6. 設置應用實例最終狀態,添加最近租約變更隊列,設置緩存等
集群數據同步

isReplication屬性為true的時候,就會牽扯到集群信息同步了

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info ,
                                  InstanceStatus newStatus , boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

關於PeerEurekaNode在之前的文章也提到了,保存了集群節點信息,這裡可以看到是迴圈所有的集群節點,然後排除本身的信息之後調用了replicateInstanceActionsToPeers方法

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

這裡重點關註Register分支

public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

在不關心Eureka自有的調度處理相關的前提下,核心代碼是replicationClient.register(info),也就是說,如果當客戶端註冊時指定需要同步集群信息時,Eureka會把這個客戶端再註冊到它所在的集群其它的節點上

服務端發起集群數據同步

在之前得EurekaServer自動裝配及啟動流程解析一文中,我們提到過在初始化服務端的時候會從EurekaServer集群中同步數據,也就是下麵這段代碼:

public class EurekaServerBootstrap {   
    protected void initEurekaServerContext() throws Exception {
  //xxxx
    
                EurekaServerContextHolder.initialize(this.serverContext);
    
                log.info("Initialized server context");
    
                // 從其他 Eureka-Server 拉取註冊信息
                int registryCount = this.registry.syncUp();
                this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
                // Register all monitoring statistics.
                EurekaMonitors.registerAllStats();
        }

數據同步的代碼在syncUp

 public int syncUp() {
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
              //未讀取到註冊信息則開始等待
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
           // 獲取註冊信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                      //判斷是否可以註冊,這裡基於AWS環境的判斷,如果不是AWS環境直接返回true,故不需要關心這個問題
                        if (isRegisterable(instance)) {
                          //註冊
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

這裡的處理流程就是當前EurekaServer獲取到客戶端的註冊信息之後,就會再次調用上邊咱們提到的服務端Controller調用的register方法,當然這次調用時isReplication參數就變為true了

本文由博客一文多發平臺 OpenWrite 發佈!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Python是一種面向對象的編程語言,語法簡潔而清晰,具有豐富和強大的類庫。Python在設計上堅持了清晰劃一的風格,這使得Python成為一門易讀、易維護,並且被大量用戶所歡迎的、用途廣泛的語言。對於初學編程者來說,首選Python是個非常棒的選擇。 零基礎學編程 零基礎學編程,用python入門 ...
  • 兩種併發編程模型 多進程 進程間通信常用的幾種方式: 文件 管道 消息隊列 多線程 一個進程中存在的多個線程,通常通過共用記憶體來通信,(說的非常非常粗俗,就是通過類似“全局變數”的一些數據對象來通信。不知道這種說對不對) 兩者優缺點 多線程優點 線程被稱為“輕量級進程”,一般啟動更快,而開啟一個進程 ...
  • 庸置疑,Spring 早已成為 Java 後端開發事實上的行業標準,無數的公司選擇 Spring 作為基礎的開發框架,大部分 Java 後端程式員在日常工作中也會接觸到 Spring ,因此,如何用好 Spring ,也就成為 Java 程式員的必修課之一。 同時,Spring Boot 和 Spr ...
  • 本文接著上一篇寫的《Java微服務(二):服務消費者與提供者搭建》,上一篇文章主要講述了消費者與服務者的搭建與簡單的實現。其中重點需要註意配置文件中的幾個坑。 本章節介紹一些零散的內容:服務的負載均衡,序列化和熔斷 1.服務負載均衡 負載均衡可分為軟體負載均衡和硬體負載均衡。在我們日常開發中,一般很 ...
  • 最近用到NodeInstantiator批量加入實體 剛開始用的時候一直程式崩潰 錯誤代碼大致如下: 大體上代碼結構類似上面這樣,簡單起見NodeInstantiator的model就寫成2 manage_system是c++寫的一個QObject子類,存儲一些載入進來的數據 manage_syst ...
  • 表單驗證分為前端驗證和伺服器端驗證。 伺服器端驗證方面,Java提供了主要用於數據驗證的JSR 303規範,而Hibernate Validator實現了JSR 303規範。 項目依賴加入spring-boot-starter-thymeleaf時,預設就會加入Hibernate Validat... ...
  • ASCII碼:只有英文和拉丁字元,一個字元占一個位元組,8位 gb2312:只有6700個中文 1980年 gbk10:存了2萬多個中文 1995年 gb18030:27000中文 2000年 utf-32:一個字元占4個位元組 utf-16:一個字元占2個位元組或2個位元組以上 utf-8:英文用ASCI ...
  • 本文源碼: "GitHub·點這裡" || "GitEE·點這裡" 一、生活場景描述 1、請假審批流程 公司常見的請假審批流程:請假天數 2、流程圖解 3、代碼實現 二、責任鏈模式 1、基礎概念 責任鏈模式是一種對象的行為模式。在責任鏈模式里,很多對象由每一個對象對其下個的引用而連接起來形成一條鏈式 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...