ChannelPipeline----貫穿io事件處理的大動脈

来源:https://www.cnblogs.com/zhuge134/archive/2019/06/29/11105485.html
-Advertisement-
Play Games

ChannelPipeline貫穿io事件處理的大動脈 上一篇,我們分析了NioEventLoop及其相關類的主幹邏輯代碼,我們知道netty採用線程封閉的方式來避免多線程之間的資源競爭,最大限度地減少併發問題,減少鎖的使用,因而能夠有效減低線程切換的開銷,減少cpu的使用時間。此外,我們還簡單分析 ...


ChannelPipeline貫穿io事件處理的大動脈

上一篇,我們分析了NioEventLoop及其相關類的主幹邏輯代碼,我們知道netty採用線程封閉的方式來避免多線程之間的資源競爭,最大限度地減少併發問題,減少鎖的使用,因而能夠有效減低線程切換的開銷,減少cpu的使用時間。此外,我們還簡單分析了netty對於線程組的封裝EventLoopGroup,目前一般採用roundRobin的方式在多個線程上均勻地分配channel。通過前面幾篇文章的分析,我們已經對channel的初始化,註冊到EventLoop上,SingleThreadEventLoop的線程啟動過程以及線程中運行的代碼邏輯有了一些瞭解,此外我們也分析了用於處理基於TCP協議的io事件的NioEventLoop類的具體的迴圈邏輯,通過對代碼的詳細分析,我們瞭解了對於connect,write,read,accept事件的不同處理邏輯,但是對於write和read事件的處理邏輯我們並沒有分析的很詳細,因為這些事件的處理涉及到netty中另一個很重要的模塊,ChannelPipeline以及一系列相關的類如Channel, ChannelHandler, ChannelhandlerContext等的理解,netty中的事件處理採用了經典的責任鏈(responsbility chain)的的設計模式,這種設計模式使得netty的io事件處理框架易於擴展,並且為業務邏輯提供了一個很好的抽象模型,大大降低了netty的使用難度,使得io事件的處理變得更符合思維習慣。
好了,廢話了那麼多,其實主要是想把前面分析的幾篇的文章做一個小結和回顧,然後引出本篇的主題--netty的io事件處理鏈模式。
因為netty的代碼結構相對來說還是很規整,它的模塊之間的邊界劃分比較明確,EventLoop作為io事件的“發源地”,與其交互的對象是Channel類,而ChannelPipeline,ChannelhandlerContext, ChannelHandler等幾個類則是與Channel交互,他們並不直接與EventLoop交互。

ChannelPipeline的結構圖

首先每一個Channel在初始化的時候就會創建一個ChannelPipeline,這點我們在前面分析NioSocketChannel的初始化時也分析到了。目前ChannelPipeline的實現只有DefaultChannelPipeline一種,所以我們也以DefaultChannelPipeline來分析。DefaultChannelPipeline內部有一個雙向鏈表結構,這個鏈表的每個節點都是一個AbstractChannelHandlerContext類型的節點,DefaultChannelPipeline剛初始化時就會創建兩個初始節點,分別是HeadContext和TailContext,這兩個節點也並不完全是標記節點,他們都有各自實際的作用,

  • HeadContext,實現了bind,connect,disconnect,close,write,flush等等幾個方法,基本都是通過直接調用unsafe的相關方法實現的。而對於其他的方法基本都是通過調用AbstractChannelHandlerContext的fire方法將事件傳給下一個節點。
  • TailContext, 主要用於處理寫數據幾乎沒有實現任何邏輯,它的功能幾乎全部繼承自AbstractChannelHandlerContext,而AbstractChannelHandlerContext對於大部分事件處理的實現都是簡單地將事件向下一個節點傳遞。註意,這裡下一個節點不一定是前一個還是後一個,要根據具體事件類型或者具體的操作而定,對於ChannelOutboundInvoker介面中的方法都是從尾節點向首節點傳遞事件,而對於ChannelInboundInvoker介面中的方法都是從首節點往尾節點傳遞。我們可以形象地理解為,首節點是最靠近socket的,而尾節點是最原理socket的,所以有數據進來時,產生的讀事件最先從首節點開始向後傳遞,當有寫數據的動作時,則會從尾節點向頭結點傳遞。

下麵,我們以兩個最重要的事件讀事件和寫事件,來分析netty的這種鏈式處理結構到底是怎麼運轉的。

讀事件

首先,我們需要找到一個產生讀事件並調用相關方法使得讀事件開始傳遞的例子,很自然我們應該想到在EventLoop中會產生讀事件。
如下,就是NioEventLoop中對於讀事件的處理,通過調用NioUnsafe.read方法

       // 處理read和accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }

我們繼續看NioByteUnsafe.read方法,這個方法我們之前在分析NioEventLoop事件處理邏輯時提到過,這個方法首先會通過緩衝分配器分配一個緩衝,然後從channel(也就是socket)中將數據讀到緩衝中,每讀一個緩衝,就會觸發一個讀事件,我們看具體的觸發讀事件的調用:

            do {
                // 分配一個緩衝
                byteBuf = allocHandle.allocate(allocator);
                // 將通道的數據讀取到緩衝中
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                // 如果沒有讀取到數據,說明通道中沒有待讀取的數據了,
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    // 因為沒讀取到數據,所以應該釋放緩衝
                    byteBuf.release();
                    byteBuf = null;
                    // 如果讀取到的數據量是負數,說明通道已經關閉了
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }

                // 更新Handle內部的簿記量
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 向channel的處理器流水線中觸發一個事件,
                // 讓取到的數據能夠被流水線上的各個ChannelHandler處理
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
                // 這裡根據如下條件判斷是否繼續讀:
                // 上一次讀取到的數據量大於0,並且讀取到的數據量等於分配的緩衝的最大容量,
                // 此時說明通道中還有待讀取的數據
            } while (allocHandle.continueReading());

為了代碼邏輯的完整性,我這裡把整個迴圈的代碼都貼上來,其實我們要關註的僅僅是pipeline.fireChannelRead(byteBuf)這一句,好了,現在我們找到ChannelPipeline觸發讀事件的入口方法,我們順著這個方法,順藤摸瓜就能一步步理清事件的傳遞過程了。

DefaultChannelPipeline.fireChannelRead

如果我們看一下ChannelPipeline介面,這裡面的方法名都是以fire開頭的,實際就是想表達這些方法都是觸發了一個事件,然後這個事件就會在內部的處理器鏈表中傳遞。
我們看到這裡調用了一個靜態方法,並且以頭結點為參數,也就是說事件傳遞是從頭結點開始的。

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

可以看到,這個方法中通過調用invokeChannelRead執行處理邏輯

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    // 維護引用計數,主要是為了偵測資源泄漏問題
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 調用invokeChannelRead執行處理邏輯
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

AbstractChannelHandlerContext.invokeChannelRead(Object msg)

這裡可以看到,AbstractChannelHandlerContext通過自己內部的handler對象來實現讀數據的邏輯。這也體現了ChannelHandlerContext在整個結構中的作用,其實它是起到了在ChannelPipeline和handler之間的一個中間人的角色,那我們要問:既然ChannelHandlerContext不起什麼實質性的作用,那為什麼要多這一個中間層呢,這樣設計的好處是什麼?我認為這樣設計其實是為了盡最大可能對使用者屏蔽netty框架的細節,試想如果沒有這個context的中間角色,使用者必然要詳細地瞭解ChannelPipeline,並且還要考慮事件傳遞是找下一個節點,還要考慮下一個節點應該沿著鏈表的正序找還是沿著鏈表 倒敘找,所以這裡ChannelHandlerContext的角色我認為最大的作用就是封裝了鏈表的邏輯,並且封裝了不同類型操作的傳播方式。當然也起到了一些引用傳遞的作用,如channel引用可以簡介地傳遞給用戶。
好了,回到正題,從前面的方法中我們知道讀事件最先是從HeadContext節點開始的,所以我們看一下HeadContext的channelRead方法(因為HeadContext也實現了handler方法,並且返回的就是自身)

private void invokeChannelRead(Object msg) {
    // 如果這個handler已經準備就緒,那麼就執行處理邏輯
    // 否則將事件傳遞給下一個處理器節點
    if (invokeHandler()) {
        try {
            // 調用內部的handler的channelRead方法
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

HeadContext.channelRead

這裡的調用也是一個重要的註意點,這裡調用了ChannelHandlerContext.fireChannelRead方法,這正是事件傳播的方法,fire開頭的方法的作用就是將當前的操作(或者叫事件)從當前處理節點傳遞給下一個處理節點。這樣就實現了事件在鏈表中的傳播。

   public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.fireChannelRead(msg);
    }

小結

到這裡我們先暫停一下,總結一下讀事件(或者是讀操作)在ChannelPipeline內部的傳播機制,其實很簡單,

  • 首先外部調用者會通過unsafe最終調用ChannelPipeline.fireChannelRead方法,並將從channel中讀取到的數據作為參數傳進來
  • 以頭結點作為參數調用靜態方法AbstractChannelHandlerContext.fireChannelRead
  • 然後頭結點HeadContext開始調用節點的invokeChannelRead方法(即ChannelHandlerContext的invokeChannelRead方法),
  • invokeChannelRead方法會調用當前節點的handler對象的channelRead方法執行處理邏輯
  • handler對象的channelRead方法中可以調用AbstractChannelHandlerContext.fireChannelRead將這個事件傳遞到下一個節點
  • 這樣事件就能夠沿著鏈條不斷傳遞下去,當然如果業務處理需要,完全可以在某個節點將事件的傳遞終止,也就是在這個節點不調用ChannelHandlerContext.fireChannelRead

寫事件

此外,我們分析一下寫數據的操作是怎麼傳播的。分析寫數據操作的入口並不想讀事件那麼好找,在netty中用戶的代碼中寫數據最終都是被放到內部的緩衝中,當NioEventLoop中監聽到底層的socket可以寫數據的事件時,實際上是吧當前緩衝中的數據發送到socket中,而對於用戶來講,是接觸不到socketChannel這一層的。
根據前面的分析,我們知道,用戶一般都會與Channel,ChannelHandler, ChannelhandlerContext這幾種類打交道,寫數據的操作也是通過Channel的write和writeAndFlush觸發的,這兩個方法區別在於writeAndFlush在寫完數據後還會觸發一次刷寫操作,將緩衝中的數據實際寫入到socket中。

AbstractChannel.write

仍然是將操作交給內部的ChannelPipeline,觸發流水線操作

public ChannelFuture write(Object msg, ChannelPromise promise) {
    return pipeline.write(msg, promise);
}

DefaultChannelPipeline.write

這裡可以很清楚地看出來,寫數據的操作從為節點開始,但是TailContext並未重寫write方法,所以最終調用的還是AbstractChannelHandlerContext中的相應方法。
我們沿著調用鏈往下走,發現write系列的方法其實是將寫操作傳遞給了下一個ChannelOutboundHandler類型的處理節點,註意這裡是從尾節點向前找,遍歷鏈表的順序和讀數據正好相反。
真正調用

public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

AbstractChannelHandlerContext.write

從這個方法可以明顯地看出來,write方法將寫操作交給了下一個ChannelOutboundHandler類型的處理器節點。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }

    // 沿著鏈表向前遍歷,找到下一個ChannelOutboundHandler類型的處理器節點
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            // 調用AbstractChannelHandlerContext.invokeWriteAndFlush方法執行真正的寫入邏輯
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 如果當前是非同步地寫入數據,那麼需要將寫入的邏輯封裝成一個任務添加到EventLoop的任務對隊列中
        final AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
            // and put it back in the Recycler for re-use later.
            //
            // See https://github.com/netty/netty/issues/8343.
            task.cancel();
        }
    }
}

AbstractChannelHandlerContext.invokeWrite

我們接著看invokeWrite0方法

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

AbstractChannelHandlerContext.invokeWrite0

這裡可以清楚地看到,最終是調用了handler的write方法執行真正的寫入邏輯,這個邏輯實際上就是有用戶自己實現的。

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        // 調用當前節點的handler的write方法執行真正的寫入邏輯
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

到這裡,我們已經知道寫入的操作是怎麼從尾節點開始,也知道了通過調用當前處理節點的AbstractChannelHandlerContext.write方法可以將寫入操作傳遞給下一個節點,那麼數據經過層層傳遞後,最終是怎麼寫到socket中的呢?回答這個問題,我們需要看一下HeadContext的代碼!我們知道寫入的操作是從尾節點向前傳遞的,那麼頭節點HeadContext就是傳遞的最後一個節點。

HeadContext.write

最終調用了unsafe.write方法。
在AbstractChannel.AbstractUnsafe的實現中,write方法將經過前面一系列處理器處理過的數據存放到內部的緩衝中。

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        unsafe.write(msg, promise);
    }

刷寫操作的傳遞

前面我們提到,寫數據的操作除了write還有writeAndFlush,這個操作除了寫數據,還會緊接著執行一次刷寫操作。刷寫操作也會從尾節點向前傳遞,最終傳遞到頭結點HeadContext,其中的flush方法如下:

    public void flush(ChannelHandlerContext ctx) {
        unsafe.flush();
    }

在AbstractChannel.AbstractUnsafe的實現中,flush操作會將前面存儲在內部緩衝區中的數據吸入到socket中,從而完成刷寫。

總結

本節,我們主要通過io事件處理中最重要的兩種事件,即讀事件和寫事件為切入點 詳細分析了netty中對於這兩種事件的處理方法。其中寫數據的事件與我們之前在jdk nio中建立起的印象差別還是不大的,都是對從socket中讀取的數據進行處理,但是寫事件跟jdk nio中的概念就有較大差別了,因為netty對數據的寫入做了很大的改變和優化,用戶代碼中通過channel調用相關的寫數據的方法,這個方法會觸發處理器鏈條上的所有相關的處理器對待寫入的數據進行加工,最後在頭結點HeadCOntext中被寫入channel內部的緩衝區,通過flush操作將緩衝的數據寫入socket中。
這裡面最重要的也是最值得我們學習的一點就是責任鏈模式,顯然,這又是一次對責任鏈模式的成功運用,是的框架的擴展性大大增強,而且面向用戶的介面更加容易理解,簡單易用,向用戶屏蔽了大部分框架實現細節。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前端HTML [TOC] HTML是什麼? Html標簽:超文本標記語言,就是標記用的. 超文本標記語言(Hypertext Markup Language, HTML)是一種用於創建網頁的標記語言。 本質上是瀏覽器可識別的規則,我們按照規則寫網頁,瀏覽器根據規則渲染我們的網頁。對於不同的瀏覽器,對 ...
  • 前言 一般可以使用cookie,localstorage,sessionStorage來實現瀏覽器端的數據緩存,減少對伺服器的請求。 1.cookie數據存放在本地硬碟中,只要在過期時間之前,都是有效的,即使重啟瀏覽器。但是會在每次HTTP請求中添加到請求頭中,如果數據過多,會造成性能問題。 2.s ...
  • 沒有指定<a>的target屬性值時,預設是"_blank,可以參考以下代碼的設置來控制<a href="#" target="_blank">鏈接</a>target的值target="_blank":在新視窗中瀏覽新的頁面。target="_self":在同一個視窗打開新的頁面。target=" ...
  • JS定時器 ...
  • 註意外邊距摺疊 註意外邊距摺疊 與其他大多數屬性不同,上下的垂直外邊距margin在同時存在時會發生外邊距摺疊。這意味著當一個元素的下邊緣接觸到另一個元素的上邊緣時,只會保留兩個margin值中較大的那個。解決外邊距摺疊的方法有很多種,對於初學者來說最簡單的就是所有元素只使用一個方向上的margin ...
  • git 碼雲地址: https://gitee.com/qichangshui_admin/vueAddTest 參考地址: https://www.jianshu.com/p/bfcfe5fc253b ...
  • [toc] 首發日期:2019 06 27 使用的技術 vue.js+element UI+mock.js vue.js是核心 element UI是一個支持vue.js的UI組件庫 mock.js:由於沒有後端介面,所以需要使用本地數據來假裝成後端的數據。 溫馨提示:如果以下內容超出了你的知識面, ...
  • elementUi vue el-radio 監聽選中變化 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...