Dubbo原理和源碼解析之服務引用

来源:https://www.cnblogs.com/cyfonly/archive/2018/05/24/9079228.html
-Advertisement-
Play Games

本文分析Dubbo服務引用的實現原理,併進行詳細的代碼跟蹤與解析。 ...


一、框架設計

在官方Dubbo 開髮指南框架設計部分,給出了引用服務時序圖:

 另外,在官方Dubbo 用戶指南集群容錯部分,給出了服務引用的各功能組件關係圖:

 本文將根據以上兩張圖,分析服務引用的實現原理,併進行詳細的代碼跟蹤與解析。

二、原理和源碼解析

2.1 創建代理

Dubbo 基於 Spring 的 Schema 擴展實現 XML 配置解析DubboNamespaceHandler 會將 <dubbo:reference> 標簽解析為 ReferenceBeanReferenceBean 實現了 FactoryBean,因此當它在代碼中有引用時,會調用 ReferenceBean#getObject() 方法進入節點註冊和服務發現流程。

ReferenceBean.java

public Object getObject() throws Exception {
    return get();
}

ReferenceConfig.java

public synchronized T get() {
    if (destroyed){
        throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

private void init() {
    //.......忽略
    ref = createProxy(map);
}


private T createProxy(Map<String, String> map) {
    //.....忽略
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
    //.....忽略
    // 創建服務代理
    return (T) proxyFactory.getProxy(invoker);
}

 2.2 服務發現

因為通過註冊中心,因此在 ReferenceConfig.java#createProxy() 方法中,進入 RegistryProtocol.java#refer() 方法。

RegistryProtocol.java

private Cluster cluster;

public void setCluster(Cluster cluster) {
    this.cluster = cluster;
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}

RegistryDirectory 通過 RegistryDirectory#subscribeUrl() 向 Zookeeper 訂閱服務節點信息並 watch 變更,這樣就實現了服務自動發現。

2.3 Invoker選取

2.3.1 Cluster

上面我之所以把設置 Cluster 的代碼貼上,是因為此處涉及到一個 Dubbo 服務框架核心的概念——微內核和插件機制(此處會單獨一篇文章詳細介紹):

 

有關 Dubbo 的設計原則,請查看Dubbo《一些設計上的基本常識》。

Cluster 類的定義如下:

Cluster.java

@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

cluster 的類型是 Cluster$Adaptive,實際上是一個通用的代理類,它會根據 URL 中的 cluster 參數值定位到實際的 Cluster 實現類(預設是 FailoverCluster)。 由於 ExtensionLoader 在實例化對象時,會在實例化完成之後自動套上 Wrapper 類,而 MockerClusterWrapper 就是這樣一個 Wrapper。

MockerClusterWrapper.java

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory, this.cluster.join(directory));
    }
}

也就是說,實例化出來的 FailoverCluster 會作為參數賦予 MockerClusterWrapper#cluster,而 MockClusterWrapper 會作為參數賦予 RegistryProtocol#cluster。因此 RegistryProtocol#doRefer() 中調用 cluster.join(directory) 實際上是調用的 MockClusterWrapper#join(directory) 使用這種機制,可以把一些公共的處理放在 Wrapper 類中,實現代碼和功能收斂。

MockClusterInvoker.java

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(),
                             Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
    if (value.length() == 0 || value.equalsIgnoreCase("false")){
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.info("force-mock: " + invocation.getMethodName() + 
                        " force-mock enabled , url : " +  directory.getUrl());
        }
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        }catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            } else {
                if (logger.isWarnEnabled()) {
                    logger.info("fail-mock: " + invocation.getMethodName() + 
                            " fail-mock enabled , url : " +  directory.getUrl(), e);
                }
                //fail:mock
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}

這裡還涉及到 Dubbo 另外一個核心機制——Mock。Mock 可以在測試中模擬服務調用的各種異常情況,還用來實現服務降級。 MockClusterWrapper.join() 方法可知,實際創建的 ClusterInvoker 是封裝了 FailoverClusterInvoker 的 MockerClusterInvoker

MockerClusterInvoker 中,調用之前 Dubbo 會先檢查 URL 中是否有 mock 參數(通過服務治理後臺 Consumer 端的屏蔽和容錯進行設置,或者直接動態設置 mock 參數值),如果存在且以 force 開頭,則不發起遠程調用直接執行降級邏輯;如果存在且以 fail 開頭,則在遠程調用異常時才會執行降級邏輯。

因此,通過 MockerClusterWrapper 成功地在 Invoker 中植入了 Mock 機制。

2.3.2 Directory

RegistryProtocol#doRefer() 中可以看到,服務發現過程是通過 RegistryDirectory 向 Zookeeper 訂閱來實現的。 先看看 Directory 類之間的關係:

看下 Directory 介面的定義:

Directory.java

public interface Directory<T> extends Node {
    
    Class<T> getInterface();

    List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

Directory 可以看做是對應 Interface 的 Invoker 列表,而這個列表可能是動態變化的,比如註冊中心推送變更。

通過 ReferenceConfig#createProxy() 方法可知,StaticDirectory 主要用於多註冊中心引用的場景,它的 invoker 列表是通過參數傳入的、固定的。在此不做更詳細的解析了。

RegistryDirectory 用於使用單註冊中心發現服務的場景。RegistryDirectory 沒有重寫 list() 方法,所以使用 AbstractDirectory#list() 方法:

AbstractDirectory.java

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    List<Invoker<T>> invokers = doList(invocation);
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || 
                         router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + 
                              ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

RegistryDirectory.java

/**
 * 獲取 invoker 列表
 */
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +
                      NetUtils.getLocalHost() + " access service " + getInterface().getName() + 
                      " from registry " + getUrl().getAddress() + " use dubbo version " +
                       Version.getVersion() + 
                       ", Please check registry access list (whitelist/blacklist).");
    }
    List<Invoker<T>> invokers = null;
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;  //本地緩存
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        String methodName = RpcUtils.getMethodName(invocation);
        //根據方法名從本地緩存中獲取invoker列表,此處略
        //……
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}


/**
 * 節點變更通知
 */
public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category) 
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + 
                        url + " from registry " + getUrl().getAddress() + 
                        " to consumer " + NetUtils.getLocalHost());
        }
    }
    // configurators 
    if (configuratorUrls != null && configuratorUrls.size() >0 ){
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && routerUrls.size() >0 ){
        List<Router> routers = toRouters(routerUrls);
        if(routers != null){ // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合併override參數
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    refreshInvoker(invokerUrls);
}


/**
 * 根據invokerURL列表轉換為invoker列表
 */
private void refreshInvoker(List<URL> invokerUrls){
    //......
    Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表
    //......
}


/**
 * 合併url參數 順序為override > -D >Consumer > Provider
 */
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    //......
    URL url = mergeUrl(providerUrl);
    //......
}

在 dolist() 方法中,如果通過服務治理禁止 Consumer 訪問的話,此處直接拋出響應的異常。

RegistryDirectory 實現了 NotifyListener,在 ZK 節點變化時能收到通知更新記憶體緩存,其中 RegistryDirectory#mergeUrl() 方法中會按照優先順序合併參數(動態配置在此處生效)。

服務引用時從記憶體緩存中獲取並返回invoker列表,並根據路由規則再進行一次過濾。

2.3.3 Router

Router 的作用就是從 Directory 的 invoker 列表中刷選出符合路由規則的 invoker 子集。目前 Dubbo 提供了基於IP、應用名和協議等的靜態路由功能,功能和實現比較簡單,在此不做過多解釋。

2.3.4 LoadBalance

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 第 1 章 JavaScript簡介 使用 Node.js 搭建 Web 伺服器 JavaScript 的類型有數字、字元串、布爾值、函數和對象。還有 undefined 和 null ,以及數組、日期和正則表達式。 操作符 .cnblogs markdown table th:nth child( ...
  • 做前端開發已經好幾年了,對設計模式一直沒有深入學習總結過。隨著架構相關的工作越來越多,越來越能感覺到設計模式成為了我前進道路上的一個阻礙。所以從今天開始深入學習和總結經典的設計模式以及面向對象的幾大原則。 今天第一天,首先來講策略模式。 什麼是策略模式? GoF四兄弟的經典《設計模式》中,對策略模式 ...
  • 一、抽象類: 抽象類是特殊的類,只是不能被實例化;除此以外,具有類的其他特性;重要的是抽象類可以包括抽象方法,這是普通類所不能的。抽象方法只能聲明於抽象類中,且不包含任何實現,派生類必須覆蓋它們。另外,抽象類可以派生自一個抽象類,可以覆蓋基類的抽象方法也可以不覆蓋,如果不覆蓋,則其派生類必須覆蓋它們 ...
  • 工廠方法模式簡介 定義 定義一個用於創建對象的介面,讓子類決定實例化哪一個類。工廠方法使一個類的實例化延遲到其子類。 簡單工廠 VS 工廠方法 簡單工廠:在工廠類中包含了必要的邏輯判斷,根據客戶端的選擇條件動態實例化相關的類,對於客戶端來說,去除了與具體產品的依賴。但是,當在工廠類中需要添加新的實例 ...
  • 今晚,夜色朦朧,正是幹壞事的好時光。所以,打算和大伙分享點不一樣的內容。 ...
  • Java開源生鮮電商平臺-性能優化以及伺服器優化的設計與架構(源碼可下載) 說明:Java開源生鮮電商平臺-性能優化以及伺服器優化的設計與架構,我採用以下三種維度來講解 1. 代碼層面。 2. 資料庫層面。 3. 伺服器層面 誠然,性能優化這個方面的確是一個長期的過程,並不是大伙們看了我的文章後就覺 ...
  • 前言 秒殺架構持續優化中,基於自身認知不足之處在所難免,也請大家指正,共同進步。文章標題來自碼友的建議,希望可以把阻塞隊列ArrayBlockingQueue這個隊列替換成Disruptor,由於之前曾接觸過這個東西,聽說很不錯,正好藉此機會整合進來。 簡介 LMAX Disruptor是一個高性能 ...
  • Django之META與前後端交互 1 提交表單之GET 前端提交數據與發送 1)提交表單數據 2)提交JSON數據 後端的數據接收與響應 1)接收GET請求數據 2)接收POST請求數據 3)響應請求 GET 請求過程 前端通過ajax發起GET請求,json格式數據 var data = { " ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...