Dubbo-服務註冊中心之AbstractRegistryFactory等源碼

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333730.html
-Advertisement-
Play Games

在上文中介紹了基礎類AbstractRegistry類的解釋,在本篇中將繼續介紹該包下的其他類。 FailbackRegistry 該類繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些記憶體緩存的變化,而真正的註冊訂閱的實現邏輯在Failbac ...


在上文中介紹了基礎類AbstractRegistry類的解釋,在本篇中將繼續介紹該包下的其他類。

FailbackRegistry

該類繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些記憶體緩存的變化,而真正的註冊訂閱的實現邏輯在FailbackRegistry實現,並且FailbackRegistry提供了失敗重試的機制。
初始化

// Scheduled executor service
    // 定時任務執行器
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    // 失敗重試定時器,定時去檢查是否有請求失敗的,如有,無限次重試。
    private final ScheduledFuture<?> retryFuture;

    // 註冊失敗的URL集合
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

    // 取消註冊失敗的URL集合
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

    // 訂閱失敗的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

    // 取消訂閱失敗的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

    // 通知失敗的URL集合
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

    /**
     * The time in milliseconds the retryExecutor will wait
     */
    // 重試頻率
    private final int retryPeriod;

構造函數

public FailbackRegistry(URL url) {
        super(url);
        // 從url中讀取重試頻率,如果為空,則預設5000ms
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 創建失敗重試定時器
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    //重試
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

構造函數主要是創建了失敗重試的定時器,重試頻率從URL取,如果沒有設置,則預設為5000ms。
在該類中對註冊、取消註冊、訂閱、取消訂閱進行了重寫操作,代碼邏輯相對簡單。

 @Override
    public void register(URL url) {
        super.register(url);
        //首先從失敗的緩存中刪除該url
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            // 向註冊中心發送一個註冊請求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 如果開啟了啟動時檢測,則直接拋出異常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            // 把這個註冊失敗的url放入緩存,並且定時重試。
            failedRegistered.add(url);
        }
    }

在註冊中它會失敗的註冊緩存和失敗的未註冊緩存集合中移除該URL,然後向註冊中心執行註冊。

AbstractRegistryFactory

該類實現了RegistryFactory介面,抽象了createRegistry方法,它實現了Registry的容器。
初始化

 private static final ReentrantLock LOCK = new ReentrantLock();

    // Registry Collection Map<RegistryAddress, Registry>
    // Registry 集合
    private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

銷毀所有的Registry對象,並清理緩存數據

public static Collection<Registry> getRegistries() {
        return Collections.unmodifiableCollection(REGISTRIES.values());
    }

public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        // 獲得鎖
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    // 銷毀
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            // 清空緩存
            REGISTRIES.clear();
        } finally {
            // Release the lock
            // 釋放鎖
            LOCK.unlock();
        }
    }

該方法是實現了Registry介面的方法,這裡最要註意的是createRegistry,因為AbstractRegistryFfactory本身就是抽象類,而createRegistry也是抽象方法,為了讓子類只要關註該方法,比如說redis實現的註冊中心和zookeeper實現的註冊中心創建方式肯定不同,而他們相同的一些操作都已經在AbstractRegistryFactory中實現,所以只要關註且實現該抽象方法即可。

@Override
    public Registry getRegistry(URL url) {
        // 修改url
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        // 計算key值
        String key = url.toServiceString();
        // Lock the registry access process to ensure a single instance of the registry
        // 獲得鎖
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            // 創建Registry對象
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            // 添加到緩存。
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            // 釋放鎖
            LOCK.unlock();
        }
    }

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
  • 什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...
  • 在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而co ...
  • 在網路傳輸中,怎麼確保通道連接的可用性是一個很重要的問題,簡單的說,在網路通信中有客戶端和服務端,一個負責發送請求,一個負責接收請求,在保證連接有效性的背景下,這兩個物體扮演了什麼角色,心跳機制能有效的保證連接的可用性,那它的機制是什麼,下文中將會詳細講解。 網路層的可用性 首先講一下TCP,在du ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...