Dubbo-服務註冊中心之AbstractRegistry

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333720.html
-Advertisement-
Play Games

在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而co ...


1.png

在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo-registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而consumer訂閱其他服務時,會將訂閱的服務通過url發送給註冊中心(URL中通常會包含各種配置)。當某個服務被關閉時,它則會從註冊中心中移除,當某個服務被修改時,則會調用notify方法觸發所有的監聽器。
首先簡單介紹一下在dubbo的基本統一數據模型URL

統一數據模型URL

在dubbo中定義的url與傳統的url有所不同,用於在擴展點之間傳輸數據,可以從url參數中獲取配置信息等數據,這一點很重要。
描述一個dubbo協議的服務

dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000

描述一個消費者

consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer&timestamp=1545721827784

接下來將著重介紹幾個重要的類。

AbstractRegistry

AbstractRegistry實現的是Registry介面,是Registry的抽象類。為了減輕註冊中心的壓力,在該類中實現了把本地url緩存到記憶體緩存property文件中,並且實現了註冊中心的註冊、訂閱等方法。
2.png

在該類中有介個關於url的變數。

  • private final Set<URL> registered = new ConcurrentHashSet<URL>();
    -> 記錄已經註冊服務的URL集合,註冊的URL不僅僅可以是服務提供者的,也可以是服務消費者的。
  • private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    -> 消費者url訂閱的監聽器集合
  • private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    -> 某個消費者被通知的服務URL集合,最外部URL的key是消費者的URL,value是一個map集合,裡面的map中的key為分類名,value是該類下的服務url集合。
  • private URL registryUrl;
    -> 註冊中心URL
  • private File file;
    -> 本地磁碟緩存文件,緩存註冊中心的數據

    初始化

    public AbstractRegistry(URL url) {
        //1. 設置配置中心的地址
        setUrl(url);
        //2. 配置中心的URL中是否配置了同步保存文件屬性,否則預設為false
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        //3. 配置信息本地緩存的文件名
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
        //逐層創建文件目錄
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        //如果現有配置緩存,則從緩存文件中載入屬性
        loadProperties();
        notify(url.getBackupUrls());
    }

載入本地磁碟緩存文件到記憶體緩存中,也就是把文件中的數據寫入到properties中

 private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                in = new FileInputStream(file);
                // 把數據寫入到記憶體緩存中
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Load registry store file " + file + ", data: " + properties);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry store file " + file, e);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }

註冊與取消註冊

對registered變數執行add和remove操作

@Override
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register: " + url);
        }
        registered.add(url);
    }

    @Override
    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
        registered.remove(url);
    }

訂閱與取消訂閱

通過消費者url從subscribed變數中獲取該消費者的所有監聽器集合,然後將該監聽器放入到集合中,取消同理。

@Override
    public void subscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("subscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("subscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Subscribe: " + url);
        }
        // 獲得該消費者url 已經訂閱的服務 的監聽器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners == null) {
            subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
            listeners = subscribed.get(url);
        }
        // 添加某個服務的監聽器
        listeners.add(listener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            listeners.remove(listener);
        }
    }

服務的恢復

註冊的恢復包括註冊服務的恢復和訂閱服務的恢復,因為在記憶體中表留了註冊的服務和訂閱的服務,因此在恢復的時候會重新拉取這些數據,分別調用發佈和訂閱的方法來重新將其錄入到註冊中心中。

protected void recover() throws Exception {
        // register
        //把記憶體緩存中的registered取出來遍歷進行註冊
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                register(url);
            }
        }
        // subscribe
        //把記憶體緩存中的subscribed取出來遍歷進行訂閱
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    subscribe(url, listener);
                }
            }
        }
    }

通知

protected void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) return;
        // 遍歷訂閱URL的監聽器集合,通知他們
        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();

            // 匹配
            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
            // 遍歷監聽器集合,通知他們
            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

    /**
     * 通知監聽器,URL 變化結果
     * @param url
     * @param listener
     * @param urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        // 將urls進行分類
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                // 按照url中key為category對應的值進行分類,如果沒有該值,就找key為providers的值進行分類
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    // 分類結果放入result
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        // 獲得某一個消費者被通知的url集合(通知的 URL 變化結果)
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            // 添加該消費者對應的url
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        // 處理通知監聽器URL 變化結果
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            // 把分類標實和分類後的列表放入notified的value中
            // 覆蓋到 `notified`
            // 當某個分類的數據為空時,會依然有 urls 。其中 `urls[0].protocol = empty` ,通過這樣的方式,處理所有服務提供者為空的情況。
            categoryNotified.put(category, categoryList);
            // 保存到文件
            saveProperties(url);
            //通知監聽器
            listener.notify(categoryList);
        }
    }

在構造函數的最後一句,調用notify(url.getBackupUrls()); 來將註冊中心url返回的urls來進行通知。從下麵代碼可以開出返回的urls是通過url的參數獲得的。

public List<URL> getBackupUrls() {
        List<URL> urls = new ArrayList<URL>();
        urls.add(this);
        String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]);
        if (backups != null && backups.length > 0) {
            for (String backup : backups) {
                urls.add(this.setAddress(backup));
            }
        }
        return urls;
    }

然後獲取遍歷所有訂閱URL,類型Map<URL,Set<NotifyListener>> ,判斷遍歷中的當前url與傳入的backupURL是否匹配,匹配了繼續向下執行,否則則跳過這個url,再處理下一個url。當向下執行時,獲取遍歷當前url的監聽器。對每個監聽器執行notify(url, listener, filterEmpty(url, urls))

  protected static List<URL> filterEmpty(URL url, List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            List<URL> result = new ArrayList<URL>(1);
            result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
            return result;
        }
        return urls;
    }

如果urls為空,則將根據url的信息新建一個url,並設置協議為空協議,放入到urls中。
然後執行notify方法,將backupURLS進行分類,放入到result中。
在上述中遍歷所有訂閱的urls,然後在每個url中再執行nofity,所以接下來的步驟可以理解成遍歷訂閱的urls,在迴圈內部獲取每個url的被通知的urls集合。
每個url獲取一個被通知的urls集合,categoryNotified
之後遍歷backURLs,它會覆蓋掉原來被通知的集合categoryNotified
遍歷結束後,會將結果保存到文件中,
最後通知監聽器處理,最後的這個通知方法在之後的篇章解釋。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 8、JSP 8.1、什麼是JSP Java Server Pages : Java伺服器端頁面,也和Servlet一樣,用於動態Web技術! 最大的特點: 寫JSP就像在寫HTML 區別: HTML只給用戶提供靜態的數據 JSP頁面中可以嵌入JAVA代碼,為用戶提供動態數據; 8.2、JSP原理 思 ...
  • 類載入與實例化 基本步驟 類裝載分為以下 5 個步驟: 載入:根據查找路徑找到相應的 class 文件然後導入 檢查:檢查載入的 class 文件的正確性 準備:給類中的靜態變數分配記憶體空間 解析:虛擬機將常量池中的符號引用替換成直接引用的過程。符號引用理解為一個標示,而直接引用直接指向記憶體中的地址 ...
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
  • 什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...