Dubbo-服務註冊中心之AbstractRegistryFactory等源碼

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333730.html
-Advertisement-
Play Games

在上文中介紹了基礎類AbstractRegistry類的解釋,在本篇中將繼續介紹該包下的其他類。 FailbackRegistry 該類繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些記憶體緩存的變化,而真正的註冊訂閱的實現邏輯在Failbac ...


在上文中介紹了基礎類AbstractRegistry類的解釋,在本篇中將繼續介紹該包下的其他類。

FailbackRegistry

該類繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些記憶體緩存的變化,而真正的註冊訂閱的實現邏輯在FailbackRegistry實現,並且FailbackRegistry提供了失敗重試的機制。
初始化

// Scheduled executor service
    // 定時任務執行器
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    // 失敗重試定時器,定時去檢查是否有請求失敗的,如有,無限次重試。
    private final ScheduledFuture<?> retryFuture;

    // 註冊失敗的URL集合
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

    // 取消註冊失敗的URL集合
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

    // 訂閱失敗的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

    // 取消訂閱失敗的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

    // 通知失敗的URL集合
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

    /**
     * The time in milliseconds the retryExecutor will wait
     */
    // 重試頻率
    private final int retryPeriod;

構造函數

public FailbackRegistry(URL url) {
        super(url);
        // 從url中讀取重試頻率,如果為空,則預設5000ms
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 創建失敗重試定時器
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    //重試
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

構造函數主要是創建了失敗重試的定時器,重試頻率從URL取,如果沒有設置,則預設為5000ms。
在該類中對註冊、取消註冊、訂閱、取消訂閱進行了重寫操作,代碼邏輯相對簡單。

 @Override
    public void register(URL url) {
        super.register(url);
        //首先從失敗的緩存中刪除該url
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            // 向註冊中心發送一個註冊請求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 如果開啟了啟動時檢測,則直接拋出異常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            // 把這個註冊失敗的url放入緩存,並且定時重試。
            failedRegistered.add(url);
        }
    }

在註冊中它會失敗的註冊緩存和失敗的未註冊緩存集合中移除該URL,然後向註冊中心執行註冊。

AbstractRegistryFactory

該類實現了RegistryFactory介面,抽象了createRegistry方法,它實現了Registry的容器。
初始化

 private static final ReentrantLock LOCK = new ReentrantLock();

    // Registry Collection Map<RegistryAddress, Registry>
    // Registry 集合
    private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

銷毀所有的Registry對象,並清理緩存數據

public static Collection<Registry> getRegistries() {
        return Collections.unmodifiableCollection(REGISTRIES.values());
    }

public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        // 獲得鎖
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    // 銷毀
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            // 清空緩存
            REGISTRIES.clear();
        } finally {
            // Release the lock
            // 釋放鎖
            LOCK.unlock();
        }
    }

該方法是實現了Registry介面的方法,這裡最要註意的是createRegistry,因為AbstractRegistryFfactory本身就是抽象類,而createRegistry也是抽象方法,為了讓子類只要關註該方法,比如說redis實現的註冊中心和zookeeper實現的註冊中心創建方式肯定不同,而他們相同的一些操作都已經在AbstractRegistryFactory中實現,所以只要關註且實現該抽象方法即可。

@Override
    public Registry getRegistry(URL url) {
        // 修改url
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        // 計算key值
        String key = url.toServiceString();
        // Lock the registry access process to ensure a single instance of the registry
        // 獲得鎖
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            // 創建Registry對象
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            // 添加到緩存。
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            // 釋放鎖
            LOCK.unlock();
        }
    }

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
  • 什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...
  • 在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而co ...
  • 在網路傳輸中,怎麼確保通道連接的可用性是一個很重要的問題,簡單的說,在網路通信中有客戶端和服務端,一個負責發送請求,一個負責接收請求,在保證連接有效性的背景下,這兩個物體扮演了什麼角色,心跳機制能有效的保證連接的可用性,那它的機制是什麼,下文中將會詳細講解。 網路層的可用性 首先講一下TCP,在du ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...