dubbo源碼閱讀之集群(故障處理策略)

来源:https://www.cnblogs.com/zhuge134/archive/2019/05/09/10841892.html
-Advertisement-
Play Games

dubbo集群概述 dubbo集群功能的切入點在ReferenceConfig.createProxy方法以及Protocol.refer方法中。 在ReferenceConfig.createProxy方法中,如果用戶指定多個提供者url或註冊中心url,那麼會創建多個Invoker,然後用Sta ...


dubbo集群概述

dubbo集群功能的切入點在ReferenceConfig.createProxy方法以及Protocol.refer方法中。
在ReferenceConfig.createProxy方法中,如果用戶指定多個提供者url或註冊中心url,那麼會創建多個Invoker,然後用StaticDirectory將這多個Invoker封裝在一起,然後用相應的Cluster實現類將這個靜態的服務目錄包裝成一個Invoker,每種集群類都對應一種Invoker的集群包裝類,例如,FailoverClusterInvoker,FailbackClusterInvoker,FailfastClusterInvoker,FailsafeClusterInvoker,ForkingClusterInvoker等等,而這些封裝集群邏輯的Invoker包裝類都繼承自AbstractClusterInvoker抽象類。這個抽象類里主要實現了調用時的狀態檢查,Invocation類參數設置,負載均衡,服務提供者可用性檢測等邏輯,而服務調用失敗後的行為邏輯則交由子類實現。

AbstractClusterInvoker.invoke

首先我們從這個方法看起,這個方法是Invoker類的調用入口,

@Override
// 這個方法的主要作用是為調用做一些前置工作,
// 包括檢查狀態,設置參數,從服務目錄取出invoker列表,根據<方法名>.loadbalance參數值獲取相應的負載均衡器
// 最後調用模板方法
public Result invoke(final Invocation invocation) throws RpcException {
    // 檢查該Invoker是否已經被銷毀
    // 在監聽到註冊中心變更刷新Invoker列表時可能會銷毀不再可用的Invoker
    checkWhetherDestroyed();

    // binding attachments into invocation.
    // 將RpcContext中的參數綁定到invocation上
    // 用戶可以通過RpcContext向每次調用傳遞不同的參數
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 列出所有的服務提供者
    // 這個方法直接調用服務目錄的list方法
    List<Invoker<T>> invokers = list(invocation);
    // 根據url中的loadbalance參數值獲取相應的負載均衡器,預設是隨機負載均衡RandomLoadBalance
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 添加調用id,唯一標識本次調用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 模板方法,子類實現
    return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker.doInvoke

我們以預設的集群類FailoverClusterInvoker為例,分析一下這個類的doInvoke方法

// 這個方法主要實現了重試的邏輯,這也正是這個類的特性,故障轉移功能
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    // 拷貝一份本地引用,invokers可能會變
    List<Invoker<T>> copyInvokers = invokers;
    // 檢查提供者列表是否為空
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 獲取調用的方法的retries參數值,重試次數等於該值+1,因為第一次調用不算重試
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    // 迴圈重試
    // 記錄最後一次出現的異常
    RpcException le = null; // last exception.
    // 記錄調用失敗的提供者
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    // 記錄調用過的提供者的地址,
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        // 每次迴圈都要重新檢查狀態,重新列出可用的提供者Invoker,並檢查可用的Invoker是否為空
        // 因為這些狀態或提供者信息隨時都可能發生變化
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        // 從可用的Invoker列表總選擇一個
        // 選擇邏輯中考慮了“粘滯”調用和負載均衡的邏輯
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        // 添加到已經調用的列表中
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // 對於業務異常直接拋出,這個異常會穿透dubbo框架直接拋給用戶
            // 非業務異常例如網路問題,連接斷開,提供者下線等可以通過故障轉移,重試機制解決,
            // 這裡之所以直接拋出是因為一旦發生了業務異常就不是dubbo框架能處理的了,再重試也沒有意義了
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

這個方法的邏輯還是比較清晰的,就是重試,這也就是這個這個類的主要功能,故障轉移,如果調用發生異常,就重試調用其他可用的提供者。其中select方法的實現在抽象類AbstractClusterInvoker中。

AbstractClusterInvoker.select

// 這個方法主要實現了“粘滯”調用的邏輯
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

    // 可以通過在url中設置sticky參數的值來決定要不要啟用“粘滯”調用的特性
    // 預設不啟用該特性
    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    // 如果緩存的粘滯Invoker已經不在可用列表裡了,那麼就應當將其移除
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    // 如果啟用了粘滯調用,並且粘滯調用存在,並且粘滯的Invoker不在已經調用失敗的Invoker列表中
    // 那麼直接返回粘滯的Invoker
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }

    // 根據負載均衡策略選擇一個Invoker
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    // 設置粘滯的Invoker
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

這個方法主要實現了“粘滯”調用的邏輯。

AbstractClusterInvoker.doSelect

// 根據負載均衡策略選擇一個Invoker
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    // 根據負載均衡策略選擇一個Invoker
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // 對於選擇出來的Invoker還要再判斷其可用性
    // 對於如下情況需要再次選擇Invoker
    // 1. 選出的Invoker在調用失敗列表中
    // 2. 設置了可用檢查為true並且選出的Invoker不可用
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 重新選擇Invoker, 首先排除調用失敗列表進行選擇,實在不行會去調用失敗列表中看能不能找到又“活過來”的提供者
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    // 如果沒有重選出新的Invoker,那麼直接用下一個Invoker
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

第一次選擇是不考慮調用失敗列表的,所以選出來的Invoker有可能在調用失敗列表中,這時需要進行重選。

AbstractClusterInvoker.reselect

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // First, try picking a invoker not in `selected`.
    for (Invoker<T> invoker : invokers) {
        if (availablecheck && !invoker.isAvailable()) {
            continue;
        }

        // 排除調用失敗列表中的Invoker
        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }

    // 如果還有剩餘的Invoker, 那麼根據負載均衡邏策略選擇一個
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // Just pick an available invoker using loadbalance policy
    // 是在沒有可用的,只能從調用失敗列表中找找看有沒有可用的
    // 因為在重試期間有可能之前調用失敗的提供者變成可用的了
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first
                    && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    // 再次選擇
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // 實在沒有可用的提供者,只能返回null了
    return null;
}

其實從這幾個選擇的方法中可以看出來,dubbo的作者還是很用心的,盡最大可能保證調用的成功。

FailfastClusterInvoker

快速失敗,只調用一次,失敗後直接拋異常。代碼很簡單,就不多說了

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                        + " select from all providers " + invokers + " for service " + getInterface().getName()
                        + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion()
                        + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                e.getCause() != null ? e.getCause() : e);
    }
}

FailsafeClusterInvoker

失敗安全的故障處理策略,所謂失敗安全是指在調用失敗後,不拋異常只記錄日誌。

@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        // 返回一個空結果,用戶需要對返回結果進行判斷
        return new RpcResult(); // ignore
    }
}

FailbackClusterInvoker

失敗後記錄下失敗的調用,之後以一定的間隔時間進行重試,這種策略很適合通知類的服務調用。重試間隔固定為5秒, 重試次數可以通過參數設置,預設是3次。

ForkingClusterInvoker

這種策略比較有意思,每次調用都會起多個線程並行第跑,誰先跑出結果就用誰的,這種估計很少用吧,誰這麼財大氣粗,大把大把的資源用來浪費。
不過這很像一些分散式計算框架中的推測執行策略,如果有些任務跑的慢,那麼就會在其他節點也跑這個任務,誰先跑完就用誰的結果,比如spark中就有推測執行的機制。

總結

不同的集群包裝類有不同的故障處理策略,預設的故障轉移,此外常用的有快速失敗,失敗安全,定時重試,合併調用等等。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一直以來,自己讀過的技術類書籍也不少了,但是都犯了一個毛病就是沒有很好的記錄下來,有些東西可能並不是平日開發中時時刻刻用到的,隨著時間的延長,學過的東西慢慢也就淡忘了,剛好最近有些時間,也正打算把<<設計模式之禪>>這本書好好的通讀一遍,順便把所想所得詳細的記錄一下,也方便以後查閱和回顧。 好,以上 ...
  • 心知天氣數據API 的QPS 在高峰時期已經達到數千的量級,如何承載這樣海量的併發請求,使客戶能穩定及時的獲取到所需數據自然也是心知技術團隊一路以來不斷探索的主題。 ...
  • 提出問題 「領域驅動設計」之於微服務,好比麥當勞之於漢堡(個人更喜歡肯德基,漢堡要大些,麥當勞的漢堡,想吃頓飽飯,請先給我上6個😂)。但是TDD測試驅動、MDD模型驅動好像也很火啊,到底什麼在驅動? 分析問題 不用著急,這是三個5分鐘就能區分開的概念。開發中在協同工作。 首先糾正兩個誤區。DDD是 ...
  • 一.SpringAOP的概述。 AOP(Aspect Oriented Programming),面向切麵編程,通過預編譯方式和運行期間動態代理實現程式的功能的統一維護的技術。AOP是OOP(面向對象編程)的擴展和延伸。舉個例子,讓大家對AOP印象更加深刻點。 比如許可權校驗。實際開發中,我們知道不是 ...
  • 一、JDK的安裝 1、打開下載好的安裝包(我在這裡附上一個百度雲連接,https://pan.baidu.com/s/1o3nx0kbmecAISeneGqykLQ 提取碼:jnw6) 傻瓜式安裝,直接點下一步就行。 2、安裝路徑 安裝路徑隨意,只要不是中文路徑就Ok!!!我比較懶,直接使用的預設安 ...
  • 參考自:https://blog.csdn.net/dreaming__ldx/article/details/84976834 https://blog.csdn.net/acterminate/article/details/79339494 題意: 給你一個數組,將數組裡的所有元素進行全排列, ...
  • 線程的理解應該結合進程來對比理解更直接 如果我們操作系統當做一個工廠的話,那麼創建一個進程就相當於在這個工廠裡面新增了一個車間,車間裡面存放了很多資源,而車間要運行起來很顯然的標誌就是流水線,而這些流水線就是線程,可以說線程是執行代碼的最小單位。 而線程和進程兩者在使用層面上有很大的相似性,所以開啟 ...
  • Python是一種廣泛使用的解釋型、高級編程、通用型編程語言,由吉多·範羅蘇姆創造,第一版發佈於1991年。可以視之為一種改良(加入一些其他編程語言的優點,如面向對象)的LISP。Python的設計哲學強調代碼的可讀性和簡潔的語法(尤其是使用空格縮進劃分代碼塊,而非使用大括弧或者關鍵詞)。相比於C+ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...