你還在擔心rpc介面超時嗎

来源:https://www.cnblogs.com/csong7876/archive/2020/07/04/13236955.html
-Advertisement-
Play Games

在使用dubbo時,通常會遇到timeout這個屬性,timeout屬性的作用是:給某個服務調用設置超時時間,如果服務在設置的時間內未返回結果,則會拋出調用超時異常:TimeoutException,在使用的過程中,我們有時會對provider和consumer兩個配置都會設置timeout值,那麼 ...


在使用dubbo時,通常會遇到timeout這個屬性,timeout屬性的作用是:給某個服務調用設置超時時間,如果服務在設置的時間內未返回結果,則會拋出調用超時異常:TimeoutException,在使用的過程中,我們有時會對provider和consumer兩個配置都會設置timeout值,那麼服務調用過程中會以哪個為準?橘子同學今天主要針對這個問題進行分析和擴展。

三種設置方式

以provider配置為例:

#### 方法級別
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl">
   <dubbo:method name="test" timeout="10000"/>
</dubbo:service>
#### 介面級別
<dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
#### 全局級別
<dubbo:service ="10000"/>

優先順序選擇

在dubbo中如果provider和consumer都配置了相同的一個屬性,比如本文分析的timeout,其實它們是有優先順序的,consumer方法配置 > provider方法配置 > consumer介面配置 > provider介面配置 > consumer全局配置 > provider全局配置。所以對於小橘開始的提出的問題就有了結果,會以消費者配置的為準,接下結合源碼來進行解析,其實源碼很簡單,在RegistryDirectory類中將服務列表轉換為DubboInvlker方法中進行了處理:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            // 重點就是下麵這個方法
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }
     

重點就是上面mergeUrl方法,將provider和comsumer的url參數進行了整合,在mergeUrl方法有會調用ClusterUtils.mergeUrl方法進行整合,因為這個方法比較簡單,就是對一些參數進行了整合了,會用consumer參數進行覆蓋,這裡就不分析了,如果感興趣的同學可以去研究一下。

超時處理

在配置設置了超時timeout,那麼代碼中是如何處理的,這裡咱們在進行一下擴展,分析一下dubbo中是如何處理超時的,在調用服務方法,最後都會調用DubboInvoker.doInvoke方法,咱們就從這個方法開始分析:

  @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);

                Result result;
                // 非同步處理
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the async result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                // 同步處理
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在這個方法中,就以同步模式進行分析,看request方法,request()方法會返回一個DefaultFuture類,在去調用DefaultFuture.get()方法,這裡其實涉及到一個在非同步中實現同步的技巧,咱們這裡不做分析,所以重點就在get()方法里:

 @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

在調用get()方法時,會去調用get(timeout)這個方法,在這個方法中會傳一個timeout欄位,在和timeout就是咱們配置的那個參數,在這個方法中咱們要關註下麵一個代碼塊:

 if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 線程阻塞
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 在超時時間里,還沒有結果,則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }

重點看await()方法,會進行阻塞timeout時間,如果阻塞時間到了,則會喚醒往下執行,超時跳出while迴圈中,判斷是否有結果返回,如果沒有(這個地方要註意:只有有結果返回,或超時才跳出迴圈中),則拋出超時異常。講到這裡,超時原理基本上其實差不多了,DefaultFuture這個類還有個地方需要註意,在初始化DefaultFuture對象時,會去創建一個超時的延遲任務,延遲時間就是timeout值,在這個延遲任務中也會調用signal()方法喚醒阻塞。

分批調用

不過在調用rpc遠程介面,如果對方的介面不能一次承載返回請求結果能力,我們一般做法是分批調用,將調用一次分成調用多次,然後對每次結果進行匯聚,當然也可以做用利用多線程的能力去執行。後面文章小橘將會介紹這種模式,敬請關註哦!

/**
 * Description:通用 分批調用工具類
 * 場景:
 * <pre>
 *     比如List參數的size可能為 幾十個甚至上百個
 *     如果invoke介面比較慢,傳入50個以上會超時,那麼可以每次傳入20個,分批執行。
 * </pre>
 * Author: OrangeCsong
 */
public class ParallelInvokeUtil {

    private ParallelInvokeUtil() {}

    /**
     * @param sourceList 源數據
     * @param size 分批大小
     * @param buildParam 構建函數
     * @param processFunction 處理函數
     * @param <R> 返回值
     * @param <T> 入參\
     * @param <P> 構建參數
     * @return
     */
    public static <R, T, P> List<R> partitionInvokeWithRes(List<T> sourceList, Integer size,
                                                           Function<List<T>, P> buildParam,
                                                           Function<P, List<R>> processFunction) {

        if (CollectionUtils.isEmpty(sourceList)) {
            return new ArrayList<>(0);
        }
        Preconditions.checkArgument(size > 0, "size大小必須大於0");

        return Lists.partition(sourceList, size).stream()
                .map(buildParam)
                .map(processFunction)
                .filter(Objects::nonNull)
                .reduce(new ArrayList<>(),
                        (resultList1, resultList2) -> {
                            resultList1.addAll(resultList2);
                            return resultList1;
                        });

    }
}

本文由博客群發一文多發等運營工具平臺 OpenWrite 發佈


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 項目描述 Hi,大家好,今天分享的項目是《個人家庭財務管理系統》,本系統是針對個人家庭內部的財務管理而開發的,大體功能模塊如下: 系統管理模塊 驗證用戶登錄功能:該功能主要是驗證用戶登錄時登錄名和密碼的正確性。 退出系統功能:註銷當前登錄的用戶。 家庭成員管理模塊 家庭成員管理功能:主要實現了對家庭 ...
  • import pandas from docx import Document excel=pandas.read_excel(r'F:\word練習\數據.xlsx',header=None) 文件=Document(r'F:\word練習\a.docx') 表=文件.add_table(4,4) ...
  • from docx import Document from docx import WD_PARAGRAPH_ALIGNMENT w=Documeent(r'F:\word練習\a.docx') #第一種方法 t=w.add_table(3,3) t.cell(0,0).text='李先生' #第 ...
  • 線程與進程相似,但線程是一個比進程更小的執行單位。一個進程在其執行的過程中可以產生多個線程。與進程不同的是同類的多個線程共用同一塊記憶體空間和一組系統資源,所以系統在產生一個線程,或是在各個線程之間作切換工作時,負擔要比進程小得多,也正因為如此,線程也被稱為輕量級進程。 程式是含有指令和數據的文件,被 ...
  • from docx import Document w=Document(r'F:\word練習\表格.docx') #刪除表 print(len(w.tables)) t=w.tables[0] t._element.getparent().remove(t._element) print(len ...
  • from docx import Document w=Document(r'F:\word練習\表格.docx') table_1=w.tables[0] #刪除行 print(len(table_1.rows)) row2=table_1.rows[1] row2._element.getpar ...
  • 值傳遞和引用傳遞: 值傳遞和引用傳遞的區別並不是傳遞的內容。而是實參到底有沒有被覆制一份給形參。在判斷實參內容有沒有受影響的時候,要看傳的的是什麼,如果你傳遞的是個地址,那麼就看這個地址的變化會不會有影響,而不是看地址指向的對象的變化。 Java中當傳遞的參數是對象時,其實還是值傳遞的,只不過對於對 ...
  • pygame 的聲音播放 1. sound 對象 在初始化聲音設備後就可以讀取一個音樂文件到一個 Sound 對象中。pygame.mixer.sound() 接收一個文件名,也可以是一個文件對象,不過這個文件對象必須是 WAV 或者 OGG 文件。 hello_sound = pygame.mix ...
一周排行
    -Advertisement-
    Play Games
  • 前言 在我們開發過程中基本上不可或缺的用到一些敏感機密數據,比如SQL伺服器的連接串或者是OAuth2的Secret等,這些敏感數據在代碼中是不太安全的,我們不應該在源代碼中存儲密碼和其他的敏感數據,一種推薦的方式是通過Asp.Net Core的機密管理器。 機密管理器 在 ASP.NET Core ...
  • 新改進提供的Taurus Rpc 功能,可以簡化微服務間的調用,同時可以不用再手動輸出模塊名稱,或調用路徑,包括負載均衡,這一切,由框架實現並提供了。新的Taurus Rpc 功能,將使得服務間的調用,更加輕鬆、簡約、高效。 ...
  • 順序棧的介面程式 目錄順序棧的介面程式頭文件創建順序棧入棧出棧利用棧將10進位轉16進位數驗證 頭文件 #include <stdio.h> #include <stdbool.h> #include <stdlib.h> 創建順序棧 // 指的是順序棧中的元素的數據類型,用戶可以根據需要進行修改 ...
  • 前言 整理這個官方翻譯的系列,原因是網上大部分的 tomcat 版本比較舊,此版本為 v11 最新的版本。 開源項目 從零手寫實現 tomcat minicat 別稱【嗅虎】心有猛虎,輕嗅薔薇。 系列文章 web server apache tomcat11-01-官方文檔入門介紹 web serv ...
  • C總結與剖析:關鍵字篇 -- <<C語言深度解剖>> 目錄C總結與剖析:關鍵字篇 -- <<C語言深度解剖>>程式的本質:二進位文件變數1.變數:記憶體上的某個位置開闢的空間2.變數的初始化3.為什麼要有變數4.局部變數與全局變數5.變數的大小由類型決定6.任何一個變數,記憶體賦值都是從低地址開始往高地 ...
  • 如果讓你來做一個有狀態流式應用的故障恢復,你會如何來做呢? 單機和多機會遇到什麼不同的問題? Flink Checkpoint 是做什麼用的?原理是什麼? ...
  • C++ 多級繼承 多級繼承是一種面向對象編程(OOP)特性,允許一個類從多個基類繼承屬性和方法。它使代碼更易於組織和維護,並促進代碼重用。 多級繼承的語法 在 C++ 中,使用 : 符號來指定繼承關係。多級繼承的語法如下: class DerivedClass : public BaseClass1 ...
  • 前言 什麼是SpringCloud? Spring Cloud 是一系列框架的有序集合,它利用 Spring Boot 的開發便利性簡化了分散式系統的開發,比如服務註冊、服務發現、網關、路由、鏈路追蹤等。Spring Cloud 並不是重覆造輪子,而是將市面上開發得比較好的模塊集成進去,進行封裝,從 ...
  • class_template 類模板和函數模板的定義和使用類似,我們已經進行了介紹。有時,有兩個或多個類,其功能是相同的,僅僅是數據類型不同。類模板用於實現類所需數據的類型參數化 template<class NameType, class AgeType> class Person { publi ...
  • 目錄system v IPC簡介共用記憶體需要用到的函數介面shmget函數--獲取對象IDshmat函數--獲得映射空間shmctl函數--釋放資源共用記憶體實現思路註意 system v IPC簡介 消息隊列、共用記憶體和信號量統稱為system v IPC(進程間通信機制),V是羅馬數字5,是UNI ...