Dubbo-服務註冊中心之AbstractRegistry

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333720.html
-Advertisement-
Play Games

在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而co ...


1.png

在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo-registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而consumer訂閱其他服務時,會將訂閱的服務通過url發送給註冊中心(URL中通常會包含各種配置)。當某個服務被關閉時,它則會從註冊中心中移除,當某個服務被修改時,則會調用notify方法觸發所有的監聽器。
首先簡單介紹一下在dubbo的基本統一數據模型URL

統一數據模型URL

在dubbo中定義的url與傳統的url有所不同,用於在擴展點之間傳輸數據,可以從url參數中獲取配置信息等數據,這一點很重要。
描述一個dubbo協議的服務

dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000

描述一個消費者

consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer&timestamp=1545721827784

接下來將著重介紹幾個重要的類。

AbstractRegistry

AbstractRegistry實現的是Registry介面,是Registry的抽象類。為了減輕註冊中心的壓力,在該類中實現了把本地url緩存到記憶體緩存property文件中,並且實現了註冊中心的註冊、訂閱等方法。
2.png

在該類中有介個關於url的變數。

  • private final Set<URL> registered = new ConcurrentHashSet<URL>();
    -> 記錄已經註冊服務的URL集合,註冊的URL不僅僅可以是服務提供者的,也可以是服務消費者的。
  • private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    -> 消費者url訂閱的監聽器集合
  • private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    -> 某個消費者被通知的服務URL集合,最外部URL的key是消費者的URL,value是一個map集合,裡面的map中的key為分類名,value是該類下的服務url集合。
  • private URL registryUrl;
    -> 註冊中心URL
  • private File file;
    -> 本地磁碟緩存文件,緩存註冊中心的數據

    初始化

    public AbstractRegistry(URL url) {
        //1. 設置配置中心的地址
        setUrl(url);
        //2. 配置中心的URL中是否配置了同步保存文件屬性,否則預設為false
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        //3. 配置信息本地緩存的文件名
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
        //逐層創建文件目錄
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        //如果現有配置緩存,則從緩存文件中載入屬性
        loadProperties();
        notify(url.getBackupUrls());
    }

載入本地磁碟緩存文件到記憶體緩存中,也就是把文件中的數據寫入到properties中

 private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                in = new FileInputStream(file);
                // 把數據寫入到記憶體緩存中
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Load registry store file " + file + ", data: " + properties);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry store file " + file, e);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }

註冊與取消註冊

對registered變數執行add和remove操作

@Override
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register: " + url);
        }
        registered.add(url);
    }

    @Override
    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
        registered.remove(url);
    }

訂閱與取消訂閱

通過消費者url從subscribed變數中獲取該消費者的所有監聽器集合,然後將該監聽器放入到集合中,取消同理。

@Override
    public void subscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("subscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("subscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Subscribe: " + url);
        }
        // 獲得該消費者url 已經訂閱的服務 的監聽器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners == null) {
            subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
            listeners = subscribed.get(url);
        }
        // 添加某個服務的監聽器
        listeners.add(listener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            listeners.remove(listener);
        }
    }

服務的恢復

註冊的恢復包括註冊服務的恢復和訂閱服務的恢復,因為在記憶體中表留了註冊的服務和訂閱的服務,因此在恢復的時候會重新拉取這些數據,分別調用發佈和訂閱的方法來重新將其錄入到註冊中心中。

protected void recover() throws Exception {
        // register
        //把記憶體緩存中的registered取出來遍歷進行註冊
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                register(url);
            }
        }
        // subscribe
        //把記憶體緩存中的subscribed取出來遍歷進行訂閱
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    subscribe(url, listener);
                }
            }
        }
    }

通知

protected void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) return;
        // 遍歷訂閱URL的監聽器集合,通知他們
        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();

            // 匹配
            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
            // 遍歷監聽器集合,通知他們
            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

    /**
     * 通知監聽器,URL 變化結果
     * @param url
     * @param listener
     * @param urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        // 將urls進行分類
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                // 按照url中key為category對應的值進行分類,如果沒有該值,就找key為providers的值進行分類
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    // 分類結果放入result
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        // 獲得某一個消費者被通知的url集合(通知的 URL 變化結果)
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            // 添加該消費者對應的url
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        // 處理通知監聽器URL 變化結果
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            // 把分類標實和分類後的列表放入notified的value中
            // 覆蓋到 `notified`
            // 當某個分類的數據為空時,會依然有 urls 。其中 `urls[0].protocol = empty` ,通過這樣的方式,處理所有服務提供者為空的情況。
            categoryNotified.put(category, categoryList);
            // 保存到文件
            saveProperties(url);
            //通知監聽器
            listener.notify(categoryList);
        }
    }

在構造函數的最後一句,調用notify(url.getBackupUrls()); 來將註冊中心url返回的urls來進行通知。從下麵代碼可以開出返回的urls是通過url的參數獲得的。

public List<URL> getBackupUrls() {
        List<URL> urls = new ArrayList<URL>();
        urls.add(this);
        String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]);
        if (backups != null && backups.length > 0) {
            for (String backup : backups) {
                urls.add(this.setAddress(backup));
            }
        }
        return urls;
    }

然後獲取遍歷所有訂閱URL,類型Map<URL,Set<NotifyListener>> ,判斷遍歷中的當前url與傳入的backupURL是否匹配,匹配了繼續向下執行,否則則跳過這個url,再處理下一個url。當向下執行時,獲取遍歷當前url的監聽器。對每個監聽器執行notify(url, listener, filterEmpty(url, urls))

  protected static List<URL> filterEmpty(URL url, List<URL> urls) {
        if (urls == null || urls.isEmpty()) {
            List<URL> result = new ArrayList<URL>(1);
            result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
            return result;
        }
        return urls;
    }

如果urls為空,則將根據url的信息新建一個url,並設置協議為空協議,放入到urls中。
然後執行notify方法,將backupURLS進行分類,放入到result中。
在上述中遍歷所有訂閱的urls,然後在每個url中再執行nofity,所以接下來的步驟可以理解成遍歷訂閱的urls,在迴圈內部獲取每個url的被通知的urls集合。
每個url獲取一個被通知的urls集合,categoryNotified
之後遍歷backURLs,它會覆蓋掉原來被通知的集合categoryNotified
遍歷結束後,會將結果保存到文件中,
最後通知監聽器處理,最後的這個通知方法在之後的篇章解釋。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 8、JSP 8.1、什麼是JSP Java Server Pages : Java伺服器端頁面,也和Servlet一樣,用於動態Web技術! 最大的特點: 寫JSP就像在寫HTML 區別: HTML只給用戶提供靜態的數據 JSP頁面中可以嵌入JAVA代碼,為用戶提供動態數據; 8.2、JSP原理 思 ...
  • 類載入與實例化 基本步驟 類裝載分為以下 5 個步驟: 載入:根據查找路徑找到相應的 class 文件然後導入 檢查:檢查載入的 class 文件的正確性 準備:給類中的靜態變數分配記憶體空間 解析:虛擬機將常量池中的符號引用替換成直接引用的過程。符號引用理解為一個標示,而直接引用直接指向記憶體中的地址 ...
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
  • 什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...