dubbo之心跳機制

来源:https://www.cnblogs.com/maratong/archive/2020/02/19/12333733.html
-Advertisement-
Play Games

在網路傳輸中,怎麼確保通道連接的可用性是一個很重要的問題,簡單的說,在網路通信中有客戶端和服務端,一個負責發送請求,一個負責接收請求,在保證連接有效性的背景下,這兩個物體扮演了什麼角色,心跳機制能有效的保證連接的可用性,那它的機制是什麼,下文中將會詳細講解。 網路層的可用性 首先講一下TCP,在du ...


在網路傳輸中,怎麼確保通道連接的可用性是一個很重要的問題,簡單的說,在網路通信中有客戶端和服務端,一個負責發送請求,一個負責接收請求,在保證連接有效性的背景下,這兩個物體扮演了什麼角色,心跳機制能有效的保證連接的可用性,那它的機制是什麼,下文中將會詳細講解。

網路層的可用性

首先講一下TCP,在dubbo中的通信是基於TCP的,TCP本身並沒有長短連接的區別,在短連接中,每次通信時,都會創建Socket,當該次通信結束後,就會調用socket.close();而在長連接中,每次通信完畢後,不會關閉連接,這樣就可以做到連接的復用,長連接的好處是省去了創建連接時的耗時。那麼如何確保連接的有效性呢,在TCP中用到了KeepAlive機制,keepalive並不是TCP協議的一部分,但是大多數操作系統都實現了這個機制,在一定時間內,在鏈路上如果沒有數據傳送的情況下,TCP層將會發送相應的keepalive探針來確定連接可用性,探測失敗後重試10次(tcp_keepalive_probes),每次間隔時間為75s(tcp_keepalive_intvl),所有探測失敗後,才認為當前連接已經不可用了。

KeepAlive機制是在網路層保證了連接的可用性,但在應用層我們認為這還是不夠的。

  • KeepAlive的報活機制只有在鏈路空閑的情況下才會起作用,假如此時有數據發送,且物理鏈路已經不通,操作系統這邊的鏈路狀態還是E STABLISHED,這時會發生TCP重傳機制,要知道預設的TCP超時重傳,指數退避演算法也是一個相當長的過程。
  • KeepAlive本身是面向網路的,並不是面向於應用的,可能是由於本身GC問題,系統load高等情況,但網路依然是通的,此時,應用已經失去了活性,所以連接自然認為是不可用的。

    應用層的連接可用性:心跳機制

    如何理解應用層的心跳?簡單的說,就是客戶端會開啟一個定時任務,定時對已經建立連接的對端應用發送請求,服務端則需要特殊處理該請求,返迴響應。如果心跳持續多次沒有收到響應,客戶端會認為連接不可用,主動斷開連接。

客戶端如何得知請求失敗了?

在失敗的場景下,服務端是不會返迴響應的,所以只能在客戶端自身上設計了。
當客戶端發起一個RPC請求時,會設置一個超時時間client_timeout,同時它也會開啟一個延遲的client_timeout的定時器。當接收到正常響應時,會移除該定時器;而當計時器倒計時完畢後,還沒有被移除,則會認為請求超時,構造一個失敗的響應傳遞給客戶端。

連接建立時創建定時器

HeaderExchangeClient類

 public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 創建信息交換通道
        this.channel = new HeaderExchangeChannel(client);
        // 獲得dubbo版本
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        //獲得心跳周期配置,如果沒有配置,並且dubbo是1.0版本的,則這隻為1分鐘,否則設置為0
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        // 獲得心跳超時配置,預設是心跳周期的三倍
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
                 
        if (needHeartbeat) {
            // 開啟心跳
          long tickDuration = calculateLeastDuration(heartbeat);
          heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true) , tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
          startHeartbeatTimer();
        }
    }

創建了一個HashedWheelTimer開啟心跳檢測,這是 Netty 所提供的一個經典的時間輪定時器實現。

HeaderExchangeServer也同時開啟了定時器,代碼邏輯和上述差不多。

開啟兩個定時任務

private void startHeartbeatTimer() {
           long heartbeatTick = calculateLeastDuration(heartbeat); 
   long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
   HeartbeatTimerTask heartBeatTimerTask =new  HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
   ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
    
  heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); 
  heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

在該方法中主要開啟了兩個定時器

  • HeartbeatTimerTask 主要是定時發送心跳請求
  • ReconnectTimerTask 主要是心跳失敗後處理重連,斷連的邏輯

舊版的心跳處理HeartBeatTask類

final class HeartBeatTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);

    /**
     * 通道管理
     */
    private ChannelProvider channelProvider;

    /**
     * 心跳間隔 單位:ms
     */
    private int heartbeat;

    /**
     * 心跳超時時間 單位:ms
     */
    private int heartbeatTimeout;

    HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    @Override
    public void run() {
        try {
            long now = System.currentTimeMillis();
            // 遍歷所有通道
            for (Channel channel : channelProvider.getChannels()) {
                // 如果通道關閉了,則跳過
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    // 最後一次接收到消息的時間戳
                    Long lastRead = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    // 最後一次發送消息的時間戳
                    Long lastWrite = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 如果最後一次接收或者發送消息到時間到現在的時間間隔超過了心跳間隔時間
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        // 創建一個request
                        Request req = new Request();
                        // 設置版本號
                        req.setVersion(Version.getProtocolVersion());
                        // 設置需要得到響應
                        req.setTwoWay(true);
                        // 設置事件類型,為心跳事件
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        // 發送心跳請求
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 如果最後一次接收消息的時間到現在已經超過了超時時間
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 如果該通道是客戶端,也就是請求的伺服器掛掉了,客戶端嘗試重連伺服器
                        if (channel instanceof Client) {
                            try {
                                // 重新連接伺服器
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        } else {
                            // 如果不是客戶端,也就是是服務端返迴響應給客戶端,但是客戶端掛掉了,則服務端關閉客戶端連接
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

    interface ChannelProvider {
        // 獲得所有的通道集合,需要心跳的通道數組
        Collection<Channel> getChannels();
    }

}

它首先遍歷所有的Channel,在服務端對用的是所有客戶端連接,在客戶端對應的是服務端連接,判斷當前TCP連接是否空閑,如果空閑就發送心跳報文,判斷是否空閑,根據Channel是否有讀或寫來決定,比如一分鐘內沒有讀或寫就發送心跳報文,然後是處理超時的問題,處理客戶端超時重新建立TCP連接,目前的策略是檢查是否在3分鐘內都沒有成功接受或發送報文,如果在服務端檢測則就會主動關閉遠程客戶端連接。

新版本的心跳機制

定時任務一: 發送心跳請求

在新版本下,去除了HeartBeatTask類,添加了HeartbeatTimerTask和ReconnectTimerTask類

public class HeartbeatTimerTask extends AbstractTimerTask {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class);

    private final int heartbeat;

    HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
        super(channelProvider, heartbeatTick);
        this.heartbeat = heartbeat;
    }

    @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long lastWrite = lastWrite(channel);
            if ((lastRead != null && now() - lastRead > heartbeat)
                    || (lastWrite != null && now() - lastWrite > heartbeat)) {
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setEvent(Request.HEARTBEAT_EVENT);
                channel.send(req);
                if (logger.isDebugEnabled()) {
                    logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period: "
                            + heartbeat + "ms");
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
        }
    }
}

Dubbo採取的是雙向心跳設計,即服務端會向客戶端發送心跳,客戶端也會向服務端發送心跳,接收的一方更新lastread欄位,發送的一方更新lastWrite欄位,超過心跳間隙的時間,便發送心跳請求給對端。

定時任務二: 處理重連和斷連

public class ReconnectTimerTask extends AbstractTimerTask {

    private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);

    private final int idleTimeout;

    public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
        super(channelProvider, heartbeatTimeoutTick);
        this.idleTimeout = idleTimeout;
    }

    @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long now = now();

            // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
            if (!channel.isConnected()) {
                try {
                    logger.info("Initial connection to " + channel);
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error("Fail to connect to " + channel, e);
                }
            // check pong at client
            } else if (lastRead != null && now - lastRead > idleTimeout) {
                logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
                        + idleTimeout + "ms");
                try {
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error(channel + "reconnect failed during idle time.", e);
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
        }
    }
}

不同類型處理機制不同,當超過設置的心跳總時間後,客戶端選擇的是重新連接,服務端是選擇直接斷開連接。

心跳改進方案

Netty對空閑連接的檢測提供了天然的支持,使用IdleStateHandler可以很方便的實現空閑檢測邏輯。

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit){}
  • readerIdleTime: 讀超時的時間
  • writerIdleTime: 寫超時的時間
  • allIdleTime: 所有類型的超時時間
    客戶端和服務端配置
    客戶端:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("clientIdleHandler", new IdleStateHandler(60, 0, 0));
    }
});

服務端:

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("serverIdleHandler",new IdleStateHandler(0, 0, 200));
    }
}

從上面看出,客戶端配置了read超時為60s,服務端配置了write/read超時未200s,

空閑超時邏輯-客戶端

對於空閑超時的處理邏輯,客戶端和服務端是不同的,首先來看客戶端的:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        // send heartbeat
        sendHeartBeat();
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

檢測到空閑超時後,採取的行為是向服務端發送心跳包,

public void sendHeartBeat() {
    Invocation invocation = new Invocation();
    invocation.setInvocationType(InvocationType.HEART_BEAT);
    channel.writeAndFlush(invocation).addListener(new CallbackFuture() {
        @Override
        public void callback(Future future) {
            RPCResult result = future.get();
            //超時 或者 寫失敗
            if (result.isError()) {
                channel.addFailedHeartBeatTimes();
                if (channel.getFailedHeartBeatTimes() >= channel.getMaxHeartBeatFailedTimes()) {
                    channel.reconnect();
                }
            } else {
                channel.clearHeartBeatFailedTimes();
            }
        }
    });
}

構造一個心跳包發送到服務端,接受響應結果

  • 響應成功,清除請求失敗標記
  • 響應失敗,心跳失敗標記+1,如果超過配置的失敗次數,則重新連接

    空閑超時邏輯 - 服務端

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        channel.close();
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

服務端直接關閉連接。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 類載入與實例化 基本步驟 類裝載分為以下 5 個步驟: 載入:根據查找路徑找到相應的 class 文件然後導入 檢查:檢查載入的 class 文件的正確性 準備:給類中的靜態變數分配記憶體空間 解析:虛擬機將常量池中的符號引用替換成直接引用的過程。符號引用理解為一個標示,而直接引用直接指向記憶體中的地址 ...
  • 左傾堆,用於堆的快速合併。 規則: ① 節點的鍵值小於或等於它的左右子節點的鍵值。 ② 節點的左孩子的NPL >= 右孩子的NPL。 ③ 節點的NPL = 它的右孩子的NPL + 1。 測試文件 main.cpp: #include <iostream> #include "LeftistHeap. ...
  • 在最近一段時間里,通過搜集有關資料加上自己的理解,設計了一款輕量級RPC,起了一個名字 lightWeightRPC 。它擁有一個RPC常見的基本功能。主要功能和特點如下: 利用Spring實現依賴註入與參數配置 利用Netty來實現客戶端與服務端的遠程通信 利用Hessian來實現序列化 設置Zo ...
  • 在本版本中引入了SPI機制,關於Java的SPI機制與Dubbo的SPI機制在以前的文章中介紹過。 傳送門: "Dubbo的SPI機制與JDK機制的不同及原理分析" 因為設計的RPC框架是基於Spring的,時常會遇到依賴註入問題。Spring中也有SPI機制,但是它有有個缺點,就是在利用SPI機制 ...
  • 在前兩個版本中,每次發起請求一次就新建一個netty的channel連接,如果在高併發情況下就會造成資源的浪費,這時實現 非同步請求 就十分重要,當有多個請求線程時,需要設計一個 線程池 來進行管理。除此之外,當前方法過於依賴註冊中心,在高併發情況下對註冊中心造成了壓力;另外如果註冊中心出現宕機等情況 ...
  • 在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
  • 什麼是RPC RPC (Remote Procedure Call Protocol), 遠程過程調用,通俗的解釋就是:客戶端在不知道調用細節的情況下,調用存在於遠程電腦上的某個對象,就像調用本地應用程式中的對象一樣,不需要瞭解底層網路技術的協議。 簡單的整體工作流程 請求端發送一個調用的數據包, ...
  • 在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而co ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...