一文聊透 Netty IO 事件的編排利器 pipeline | 詳解所有 IO 事件的觸發時機以及傳播路徑

来源:https://www.cnblogs.com/binlovetech/archive/2022/07/09/16460633.html
-Advertisement-
Play Games

歡迎關註公眾號:bin的技術小屋,本文圖片載入不出來的話可查看公眾號原文 本系列Netty源碼解析文章基於 4.1.56.Final版本 1. 前文回顧 在前邊的系列文章中,筆者為大家詳細剖析了 Reactor 模型在 netty 中的創建,啟動,運行,接收連接,接收數據,發送數據的完整流程,在詳細 ...


歡迎關註公眾號:bin的技術小屋,本文圖片載入不出來的話可查看公眾號原文

本系列Netty源碼解析文章基於 4.1.56.Final版本

1. 前文回顧

在前邊的系列文章中,筆者為大家詳細剖析了 Reactor 模型在 netty 中的創建啟動運行接收連接接收數據發送數據的完整流程,在詳細剖析整個 Reactor 模型如何在 netty 中實現的過程里,我們或多或少的見到了 pipeline 的身影。

image

比如在 Reactor 啟動的過程中首先需要創建 NioServerSocketChannel ,在創建的過程中會為 NioServerSocketChannel 創建分配一個 pipeline ,用於對 OP_ACCEPT 事件的編排。

當 NioServerSocketChannel 向 main reactor 註冊成功後,會在 pipeline 中觸發 ChannelRegistered 事件的傳播。

當 NioServerSocketChannel 綁定埠成功後,會在 pipeline 中觸發 ChannelActive 事件的傳播。

image

又比如在 Reactor 接收連接的過程中,當客戶端發起一個連接並完成三次握手之後,連接對應的 Socket 會存放在內核中的全連接隊列中,隨後 JDK Selector 會通知 main reactor 此時 NioServerSocketChannel 上有 OP_ACCEPT 事件活躍,最後 main reactor 開始執行 NioServerSocketChannel 的底層操作類 NioMessageUnsafe#read 方法在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRead 事件。

image

最終會在 NioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 中響應 ChannelRead 事件並創建初始化 NioSocketChannel ,隨後會為每一個新創建的 NioSocetChannel 創建分配一個獨立的 pipeline ,用於各自 NioSocketChannel 上的 IO 事件的編排。並向 sub reactor 註冊 NioSocketChannel ,隨後在 NioSocketChannel 的 pipeline 中傳播 ChannelRegistered 事件,最後傳播 ChannelActive 事件。

image

還有在《Netty如何高效接收網路數據》一文中,我們也提過當 sub reactor 讀取 NioSocketChannel 中來自客戶端的請求數據時,會在 NioSocketChannel 的 pipeline 中傳播 ChannelRead 事件,在一個完整的 read loop 讀取完畢後會傳播 ChannelReadComplete 事件。

《一文搞懂Netty發送數據全流程》一文中,我們講到了在用戶經過業務處理後,通過 write 方法和 flush 方法分別在 NioSocketChannel 的 pipeline 中傳播 write 事件和 flush 事件的過程。

筆者帶大家又回顧了一下在前邊系列文章中關於 pipeline 的使用場景,但是在這些系列文章中並未對 pipeline 相關的細節進行完整全面地描述,那麼本文筆者將為大家詳細的剖析下 pipeline 在 IO 事件的編排和傳播場景下的完整實現原理。

image

2. pipeline的創建

image

Netty 會為每一個 Channel 分配一個獨立的 pipeline ,pipeline 伴隨著 channel 的創建而創建。

前邊介紹到 NioServerSocketChannel 是在 netty 服務端啟動的過程中創建的。而 NioSocketChannel 的創建是在當 NioServerSocketChannel 上的 OP_ACCEPT 事件活躍時,由 main reactor 線程在 NioServerSocketChannel 中創建,併在 NioServerSocketChannel 的 pipeline 中對 OP_ACCEPT 事件進行編排時(圖中的 ServerBootstrapAcceptor 中)初始化的。

無論是創建 NioServerSocketChannel 里的 pipeline 還是創建 NioSocketChannel 里的 pipeline , 最終都會委托給它們的父類 AbstractChannel 。

image

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用於底層socket的相關操作
        unsafe = newUnsafe();
        //為channel分配獨立的pipeline用於IO事件編排
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

}
public class DefaultChannelPipeline implements ChannelPipeline {

      ....................

    //pipeline中的頭結點
    final AbstractChannelHandlerContext head;
    //pipeline中的尾結點
    final AbstractChannelHandlerContext tail;

    //pipeline中持有對應channel的引用
    private final Channel channel;

       ....................

    protected DefaultChannelPipeline(Channel channel) {
        //pipeline中持有對應channel的引用
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        
        ............省略.......

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

       ....................
}

在前邊的系列文章中筆者多次提到過,pipeline 的結構是由 ChannelHandlerContext 類型的節點構成的雙向鏈表。其中頭結點為 HeadContext ,尾結點為 TailContext 。其初始結構如下:

image

2.1 HeadContext

    private static final String HEAD_NAME = generateName0(HeadContext.class);

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
       //headContext中持有對channel unsafe操作類的引用 用於執行channel底層操作
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, HeadContext.class);
            //持有channel unsafe操作類的引用,後續用於執行channel底層操作
            unsafe = pipeline.channel().unsafe();
            //設置channelHandler的狀態為ADD_COMPLETE
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }

        .......................
    }

我們知道雙向鏈表結構的 pipeline 中的節點元素為 ChannelHandlerContext ,既然 HeadContext 作為 pipeline 的頭結點,那麼它一定是 ChannelHandlerContext 類型的,所以它需要繼承實現 AbstractChannelHandlerContext ,相當於一個哨兵的作用,因為用戶可以以任意順序向 pipeline 中添加 ChannelHandler ,需要用 HeadContext 來固定指向第一個 ChannelHandlerContext 。

《一文搞懂Netty發送數據全流程》 一文中的《1. ChannelHandlerContext》小節中,筆者曾為大家詳細介紹過 ChannelHandlerContext 在 pipeline 中的作用,忘記的同學可以在回看下。

於此同時 HeadContext 又實現了 ChannelInboundHandler 和 ChannelOutboundHandler 介面,說明 HeadContext 即是一個 ChannelHandlerContext 又是一個 ChannelHandler ,它可以同時處理 Inbound 事件和 Outbound 事件。

我們也註意到 HeadContext 中持有了對應 channel 的底層操作類 unsafe ,這也說明 IO 事件在 pipeline 中的傳播最終會落在 HeadContext 中進行最後的 IO 處理。它是 Inbound 事件的處理起點,也是 Outbound 事件的處理終點。這裡也可以看出 HeadContext 除了起到哨兵的作用,它還承擔了對 channel 底層相關的操作。

比如我們在《Reactor在Netty中的實現(啟動篇)》中介紹的 NioServerSocketChannel 在向 main reactor 註冊完成後會觸發 ChannelRegistered 事件從 HeadContext 開始依次在 pipeline 中向後傳播。

      @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            //此時firstRegistration已經變為false,在pipeline.invokeHandlerAddedIfNeeded中已被調用過
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }

以及 NioServerSocketChannel 在與埠綁定成功後會觸發 ChannelActive 事件從 HeadContext 開始依次在 pipeline 中向後傳播,併在 HeadContext 中通過 unsafe.beginRead() 註冊 OP_ACCEPT 事件到 main reactor 中。

     @Override
        public void read(ChannelHandlerContext ctx) {
            //觸發註冊OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }

同理在 NioSocketChannel 在向 sub reactor 註冊成功後。會先後觸發 ChannelRegistered 事件和 ChannelActive 事件從 HeadContext 開始在 pipeline 中向後傳播。併在 HeadContext 中通過 unsafe.beginRead() 註冊 OP_READ 事件到 sub reactor 中。

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline中繼續向後傳播channelActive事件
            ctx.fireChannelActive();
            //如果是autoRead 則自動觸發read事件傳播
            //在read回調函數中 觸發OP_ACCEPT或者OP_READ事件註冊
            readIfIsAutoRead();
        }

《一文搞懂Netty發送數據全流程》中介紹的 write 事件和 flush 事件最終會在 pipeline 中從後向前一直傳播到 HeadContext ,併在 HeadContext 中相應事件回調函數中調用 unsafe 類操作底層 channel 發送數據。

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            //到headContext這裡 msg的類型必須是ByteBuffer,也就是說必須經過編碼器將業務層寫入的實體編碼為ByteBuffer
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

從本小節的內容介紹中,我們可以看出在 Netty 中對於 Channel 的相關底層操作調用均是在 HeadContext 中觸發的。

2.2 TailContext

    private static final String TAIL_NAME = generateName0(TailContext.class);

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, TailContext.class);
            //設置channelHandler的狀態為ADD_COMPLETE
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }
    
        ......................
}

同樣 TailContext 作為雙向鏈表結構的 pipeline 中的尾結點,也需要繼承實現 AbstractChannelHandlerContext 。但它同時又實現了 ChannelInboundHandler 。

這說明 TailContext 除了是一個 ChannelHandlerContext 同時也是一個 ChannelInboundHandler 。

2.2.1 TailContext 作為一個 ChannelHandlerContext 的作用

TailContext 作為一個 ChannelHandlerContext 的作用是負責將 outbound 事件從 pipeline 的末尾一直向前傳播直到 HeadContext 。當然前提是用戶需要調用 channel 的相關 outbound 方法。

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

    @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }
}
public class DefaultChannelPipeline implements ChannelPipeline {

   @Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

    @Override
    public final ChannelPipeline flush() {
        tail.flush();
        return this;
    }

   @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }

}

這裡我們可以看到,當我們在自定義 ChannelHandler 中調用 ctx.channel().write(msg) 時,會在 AbstractChannel 中觸發 pipeline.write(msg) ,最終在 DefaultChannelPipeline 中調用 tail.write(msg) 。使得 write 事件可以從 pipeline 的末尾開始向前傳播,其他 outbound 事件的傳播也是一樣的道理。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.channel().write(msg);
    }

}

而我們自定義的 ChannelHandler 會被封裝在一個 ChannelHandlerContext 中從而加入到 pipeline 中,而這個用於裝載自定義 ChannelHandler 的 ChannelHandlerContext 與 TailContext 一樣本質也都是 ChannelHandlerContext ,只不過在 pipeline 中的位置不同罷了。

image

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    ChannelFuture write(Object msg);

    ChannelFuture write(Object msg, ChannelPromise promise);

    ChannelOutboundInvoker flush();

    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    ChannelFuture writeAndFlush(Object msg);

}

我們看到 ChannelHandlerContext 介面本身也會繼承 ChannelInboundInvoker
和 ChannelOutboundInvoker 介面,所以說 ContextHandlerContext 也可以觸發 inbound 事件和 outbound 事件,只不過表達的語義是在 pipeline 中從當前 ChannelHandler 開始向前或者向後傳播 outbound 事件或者 inbound 事件。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.write(msg);
    }

}

這裡表示 write 事件從當前 EchoServerHandler 開始在 pipeline 中向前傳播直到 HeadContext 。

image

2.2.2 TailContext 作為一個 ChannelInboundHandler 的作用

最後 TailContext 作為一個 ChannelInboundHandler 的作用就是為 inbound 事件在 pipeline 中的傳播做一個兜底的處理。

這裡提到的兜底處理是什麼意思呢?

比如我們前邊介紹到的,在 NioSocketChannel 向 sub reactor 註冊成功後之後觸發的 ChannelRegistered 事件和 ChannelActive 事件。或者在 reactor 線程讀取 NioSocketChannel 中的請求數據時所觸發的 channelRead 事件和 ChannelReadComplete 事件。

這些 inbound 事件都會首先從 HeadContext 開始在 pipeline 中一個一個的向後傳遞。

極端的情況是如果 pipeline 中所有 ChannelInboundHandler 中相應的 inbound 事件回調方法均不對事件作出處理,並繼續向後傳播。如下示例代碼所示:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.fireChannelReadComplete();
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }
}

最終這些 inbound 事件在 pipeline 中得不到處理,最後會傳播到 TailContext 中。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            onUnhandledInboundMessage(ctx, msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            onUnhandledInboundChannelReadComplete();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            onUnhandledInboundChannelActive();
        }

}

而在 TailContext 中需要對這些得不到任何處理的 inbound 事件做出最終處理。比如丟棄該 msg,並釋放所占用的 directByteBuffer,以免發生記憶體泄露。

    protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
        if (logger.isDebugEnabled()) {
            logger.debug("Discarded message pipeline : {}. Channel : {}.",
                         ctx.pipeline().names(), ctx.channel());
        }
    }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

3. pipeline中的事件分類

在前邊的系列文章中,筆者多次介紹過,Netty 中的 IO 事件一共分為兩大類: inbound 類事件和 outbound 類事件。其實如果嚴格來分的話應該分為三類。第三種事件類型為 exceptionCaught 異常事件類型。

而 exceptionCaught 事件在事件傳播角度上來說和 inbound 類事件一樣,都是從 pipeline 的 HeadContext 開始一直向後傳遞或者從當前 ChannelHandler 開始一直向後傳遞直到 TailContext 。所以一般也會將 exceptionCaught 事件統一歸為 inbound 類事件。

而根據事件類型的分類,相應負責處理事件回調的 ChannelHandler 也會被分為兩類:

  • ChannelInboundHandler :主要負責響應處理 inbound 類事件回調和 exceptionCaught 事件回調。

  • ChannelOutboundHandler :主要負責響應處理 outbound 類事件回調。

那麼我們常說的 inbound 類事件和 outbound 類事件具體都包含哪些事件呢?

3.1 inbound類事件

final class ChannelHandlerMask {

    // inbound事件集合
    static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;

    // inbound 類事件相關掩碼
    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;

}

netty 會將其支持的所有非同步事件用掩碼來表示,定義在 ChannelHandlerMask 類中, netty 框架通過這些事件掩碼可以很方便的知道用戶自定義的 ChannelHandler 是屬於什麼類型的(ChannelInboundHandler or ChannelOutboundHandler )。

除此之外,inbound 類事件如此之多,用戶也並不是對所有的 inbound 類事件感興趣,用戶可以在自定義的 ChannelInboundHandler 中覆蓋自己感興趣的 inbound 事件回調,從而達到針對特定 inbound 事件的監聽。

這些用戶感興趣的 inbound 事件集合同樣也會用掩碼的形式保存在自定義 ChannelHandler 對應的 ChannelHandlerContext 中,這樣當特定 inbound 事件在 pipeline 中開始傳播的時候,netty 可以根據對應 ChannelHandlerContext 中保存的 inbound 事件集合掩碼來判斷,用戶自定義的 ChannelHandler 是否對該 inbound 事件感興趣,從而決定是否執行用戶自定義 ChannelHandler 中的相應回調方法或者跳過對該 inbound 事件不感興趣的 ChannelHandler 繼續向後傳播。

從以上描述中,我們也可以窺探出,Netty 引入 ChannelHandlerContext 來封裝 ChannelHandler 的原因,在代碼設計上還是遵循單一職責的原則, ChannelHandler 是用戶接觸最頻繁的一個 netty 組件,netty 希望用戶能夠把全部註意力放在最核心的 IO 處理上,用戶只需要關心自己對哪些非同步事件感興趣並考慮相應的處理邏輯即可,而並不需要關心非同步事件在 pipeline 中如何傳遞,如何選擇具有執行條件的 ChannelHandler 去執行或者跳過。這些切麵性質的邏輯,netty 將它們作為上下文信息全部封裝在 ChannelHandlerContext 中由netty框架本身負責處理。

以上這些內容,筆者還會在事件傳播相關小節做詳細的介紹,之所以這裡引出,還是為了讓大家感受下利用掩碼進行集合操作的便利性,netty 中類似這樣的設計還有很多,比如前邊系列文章中多次提到過的,channel 再向 reactor 註冊 IO 事件時,netty 也是將 channel 感興趣的 IO 事件用掩碼的形式存儲於 SelectionKey 中的 int interestOps 中。

接下來筆者就為大家介紹下這些 inbound 事件,並梳理出這些 inbound 事件的觸發時機。方便大家根據各自業務需求靈活地進行監聽。

3.1.1 ExceptionCaught 事件

在本小節介紹的這些 inbound 類事件在 pipeline 中傳播的過程中,如果在相應事件回調函數執行的過程中發生異常,那麼就會觸發對應 ChannelHandler 中的 exceptionCaught 事件回調。

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "An exception {}" +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:",
                        ThrowableUtil.stackTraceToString(error), cause);
                } else if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:", error, cause);
                }
            }
        } else {
            fireExceptionCaught(cause);
        }
    }

當然用戶可以選擇在 exceptionCaught 事件回調中是否執行 ctx.fireExceptionCaught(cause) 從而決定是否將 exceptionCaught 事件繼續向後傳播。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ..........
        ctx.fireExceptionCaught(cause);
    }

當 netty 內核處理連接的接收,以及數據的讀取過程中如果發生異常,會在整個 pipeline 中觸發 exceptionCaught 事件的傳播。

這裡筆者為什麼要單獨強調在 inbound 事件傳播的過程中發生異常,才會回調 exceptionCaught 呢 ?

因為 inbound 事件一般都是由 netty 內核觸發傳播的,而 outbound 事件一般都是由用戶選擇觸發的,比如用戶在處理完業務邏輯觸發的 write 事件或者 flush 事件。

而在用戶觸發 outbound 事件後,一般都會得到一個 ChannelPromise 。用戶可以向 ChannelPromise 添加各種 listener 。當 outbound 事件在傳播的過程中發生異常時,netty 會通知用戶持有的這個 ChannelPromise ,但不會觸發 exceptionCaught 的回調

比如我們在《一文搞懂Netty發送數據全流程》一文中介紹到的在 write 事件傳播的過程中就不會觸發 exceptionCaught 事件回調。只是去通知用戶的 ChannelPromise 。

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //調用當前ChannelHandler中的write方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
        PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
    }

而 outbound 事件中只有 flush 事件的傳播是個例外,當 flush 事件在 pipeline 傳播的過程中發生異常時,會觸發對應異常 ChannelHandler 的 exceptionCaught 事件回調。因為 flush 方法的簽名中不會給用戶返回 ChannelPromise 。

    @Override
    ChannelHandlerContext flush();
    private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }

3.1.2 ChannelRegistered 事件

當 main reactor 在啟動的時候,NioServerSocketChannel 會被創建並初始化,隨後就會向main reactor註冊,當註冊成功後就會在 NioServerSocketChannel 中的 pipeline 中傳播 ChannelRegistered 事件。

當 main reactor 接收客戶端發起的連接後,NioSocketChannel 會被創建並初始化,隨後會向 sub reactor 註冊,當註冊成功後會在 NioSocketChannel 中的 pipeline 傳播 ChannelRegistered 事件。

image

private void register0(ChannelPromise promise) {

        ................
        //執行真正的註冊操作
        doRegister();

        ...........

        //觸發channelRegister事件
        pipeline.fireChannelRegistered();

        .......
}

註意:此時對應的 channel 還沒有註冊 IO 事件到相應的 reactor 中。

3.1.3 ChannelActive 事件

當 NioServerSocketChannel 再向 main reactor 註冊成功並觸發 ChannelRegistered 事件傳播之後,隨後就會在 pipeline 中觸發 bind 事件,而 bind 事件是一個 outbound 事件,會從 pipeline 中的尾結點 TailContext 一直向前傳播最終在 HeadContext 中執行真正的綁定操作。

     @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            //觸發AbstractChannel->bind方法 執行JDK NIO SelectableChannel 執行底層綁定操作
            unsafe.bind(localAddress, promise);
        }
       @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
             ..............

            doBind(localAddress);

            ...............

            //綁定成功後 channel激活 觸發channelActive事件傳播
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //HeadContext->channelActive回調方法 執行註冊OP_ACCEPT事件
                        pipeline.fireChannelActive();
                    }
                });
            }
  
            ...............
        }

當 netty 服務端 NioServerSocketChannel 綁定埠成功之後,才算是真正的 Active ,隨後觸發 ChannelActive 事件在 pipeline 中的傳播。

之前我們也提到過判斷 NioServerSocketChannel 是否 Active 的標準就是 : 底層 JDK Nio ServerSocketChannel 是否 open 並且 ServerSocket 是否已經完成綁定。

    @Override
    public boolean isActive() {
        return isOpen() && javaChannel().socket().isBound();
    }

而客戶端 NioSocketChannel 中觸發 ChannelActive 事件就會比較簡單,當 NioSocketChannel 再向 sub reactor 註冊成功並觸發 ChannelRegistered 之後,緊接著就會觸發 ChannelActive 事件在 pipeline 中傳播。

image

private void register0(ChannelPromise promise) {

        ................
        //執行真正的註冊操作
        doRegister();

        ...........

        //觸發channelRegister事件
        pipeline.fireChannelRegistered();

        .......

        if (isActive()) {

                    if (firstRegistration) {
                        //觸發channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
          }
}

而客戶端 NioSocketChannel 是否 Active 的標識是:底層 JDK NIO
SocketChannel 是否 open 並且底層 socket 是否連接。毫無疑問,這裡的 socket 一定是 connected 。所以直接觸發 ChannelActive 事件。

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

註意:此時 channel 才會到相應的 reactor 中去註冊感興趣的 IO 事件。當用戶自定義的 ChannelHandler 接收到 ChannelActive 事件時,表明 IO 事件已經註冊到 reactor 中了。

3.1.4 ChannelRead 和 ChannelReadComplete 事件

image

當客戶端有新連接請求的時候,服務端的 NioServerSocketChannel 上的 OP_ACCEPT 事件會活躍,隨後 main reactor 會在一個 read loop 中不斷的調用 serverSocketChannel.accept() 接收新的連接直到全部接收完畢或者達到 read loop 最大次數 16 次。

在 NioServerSocketChannel 中,每 accept 一個新的連接,就會在 pipeline 中觸發 ChannelRead 事件。一個完整的 read loop 結束之後,會觸發 ChannelReadComplete 事件。

    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        @Override
        public void read() {
            ......................


                try {
                    do {
                        //底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        .................
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {            
                    pipeline.fireChannelRead(readBuf.get(i));
                }

                pipeline.fireChannelReadComplete();

                     .................
        }
    }

當客戶端 NioSocketChannel 上有請求數據到來時,NioSocketChannel 上的 OP_READ 事件活躍,隨後 sub reactor 也會在一個 read loop 中對 NioSocketChannel 中的請求數據進行讀取直到讀取完畢或者達到 read loop 的最大次數 16 次。

在 read loop 的讀取過程中,每讀取一次就會在 pipeline 中觸發 ChannelRead 事件。當一個完整的 read loop 結束之後,會在 pipeline 中觸發 ChannelReadComplete 事件。

image

這裡需要註意的是當 ChannelReadComplete 事件觸發時,此時並不代表 NioSocketChannel 中的請求數據已經讀取完畢,可能的情況是發送的請求數據太多,在一個 read loop 中讀取不完達到了最大限制次數 16 次,還沒全部讀取完畢就退出了 read loop 。一旦退出 read loop 就會觸發 ChannelReadComplete 事件。詳細內容可以查看筆者的這篇文章《Netty如何高效接收網路數據》

3.1.5 ChannelWritabilityChanged 事件

當我們處理完業務邏輯得到業務處理結果後,會調用 ctx.write(msg) 觸發 write 事件在 pipeline 中的傳播。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
         ctx.write(msg);
    }

最終 netty 會將發送數據 msg 寫入 NioSocketChannel 中的待發送緩衝隊列 ChannelOutboundBuffer 中。並等待用戶調用 flush 操作從 ChannelOutboundBuffer 中將待發送數據 msg ,寫入到底層 Socket 的發送緩衝區中。

image

當對端的接收處理速度非常慢或者網路狀況極度擁塞時,使得 TCP 滑動視窗不斷的縮小,這就導致發送端的發送速度也變得越來越小,而此時用戶還在不斷的調用 ctx.write(msg) ,這就會導致 ChannelOutboundBuffer 會急劇增大,從而可能導致 OOM 。netty 引入了高低水位線來控制 ChannelOutboundBuffer 的記憶體占用。

public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
}

當 ChanneOutboundBuffer 中的記憶體占用量超過高水位線時,netty 就會將對應的 channel 置為不可寫狀態,併在 pipeline 中觸發 ChannelWritabilityChanged 事件。

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0) {
                    //觸發fireChannelWritabilityChanged事件 表示當前channel變為不可寫
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

當 ChannelOutboundBuffer 中的記憶體占用量低於低水位線時,netty 又會將對應的 NioSocketChannel 設置為可寫狀態,並再次觸發 ChannelWritabilityChanged 事件。

image

    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

用戶可在自定義 ChannelHandler 中通過 ctx.channel().isWritable() 判斷當前 channel 是否可寫。

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

        if (ctx.channel().isWritable()) {
            ...........當前channel可寫.........
        } else {
            ...........當前channel不可寫.........
        }
    }

3.1.6 UserEventTriggered 事件

netty 提供了一種事件擴展機制可以允許用戶自定義非同步事件,這樣可以使得用戶能夠靈活的定義各種複雜場景的處理機制。

下麵我們來看下如何在 Netty 中自定義非同步事件。

  1. 定義非同步事件。
public final class OurOwnDefinedEvent {
 
    public static final OurOwnDefinedEvent INSTANCE = new OurOwnDefinedEvent();

    private OurOwnDefinedEvent() { }
}
  1. 觸發自定義事件的傳播
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
            ......省略.......
            //事件在pipeline中從當前ChannelHandlerContext開始向後傳播
            ctx.fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);
            //事件從pipeline的頭結點headContext開始向後傳播
            ctx.channel().pipeline().fireUserEventTriggered(OurOwnDefinedEvent.INSTANCE);

    }
}
     
  1. 自定義事件的響應和處理。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (OurOwnDefinedEvent.INSTANCE == evt) {
              .....自定義事件處理......
        }
    }

}

後續隨著我們源碼解讀的深入,我們還會看到 Netty 自己本身也定義了許多 UserEvent 事件,我們後面還會在介紹,大家這裡只是稍微瞭解一下相關的用法即可。

3.1.7 ChannelInactive和ChannelUnregistered事件

當 Channel 被關閉之後會在 pipeline 中先觸發 ChannelInactive 事件的傳播然後在觸發 ChannelUnregistered 事件的傳播。

我們可以在 Inbound 類型的 ChannelHandler 中響應 ChannelInactive 和 ChannelUnregistered 事件。

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        
        ......響應inActive事件...
        
        //繼續向後傳播inActive事件
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        
          ......響應Unregistered事件...

        //繼續向後傳播Unregistered事件
        super.channelUnregistered(ctx);
    }

這裡和連接建立之後的事件觸發順序正好相反,連接建立之後是先觸發 ChannelRegistered 事件然後在觸發 ChannelActive 事件。

3.2 Outbound 類事件

final class ChannelHandlerMask {

    // outbound 事件的集合
    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

    private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
    
    // outbound 事件掩碼
    static final int MASK_BIND = 1 << 9;
    static final int MASK_CONNECT = 1 << 10;
    static final int MASK_DISCONNECT = 1 << 11;
    static final int MASK_CLOSE = 1 << 12;
    static final int MASK_DEREGISTER = 1 << 13;
    static final int MASK_READ = 1 << 14;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;
}

和 Inbound 類事件一樣,Outbound 類事件也有對應的掩碼表示。下麵我們來看下 Outbound類事件的觸發時機:

3.2.1 read 事件

大家這裡需要註意區分 read 事件和 ChannelRead 事件的不同

ChannelRead 事件前邊我們已經介紹了,當 NioServerSocketChannel 接收到新連接時,會觸發 ChannelRead 事件在其 pipeline 上傳播。

當 NioSocketChannel 上有請求數據時,在 read loop 中讀取請求數據時會觸發 ChannelRead 事件在其 pipeline 上傳播。

而 read 事件則和 ChannelRead 事件完全不同,read 事件特指使 Channel 具備感知 IO 事件的能力。NioServerSocketChannel 對應的 OP_ACCEPT 事件的感知能力,NioSocketChannel 對應的是 OP_READ 事件的感知能力。

read 事件的觸發是在當 channel 需要向其對應的 reactor 註冊讀類型事件時(比如 OP_ACCEPT 事件 和 OP_READ 事件)才會觸發。read 事件的響應就是將 channel 感興趣的 IO 事件註冊到對應的 reactor 上。

比如 NioServerSocketChannel 感興趣的是 OP_ACCEPT 事件, NioSocketChannel 感興趣的是 OP_READ 事件。

在前邊介紹 ChannelActive 事件時我們提到,當 channel 處於 active 狀態後會在 pipeline 中傳播 ChannelActive 事件。而在 HeadContext 中的 ChannelActive 事件回調中會觸發 Read 事件的傳播。

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();  
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //如果是autoRead 則觸發read事件傳播
                channel.read();
            }
        }

        @Override
        public void read(ChannelHandlerContext ctx) {
            //觸發註冊OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }
 }

而在 HeadContext 中的 read 事件回調中會調用 Channel 的底層操作類 unsafe 的 beginRead 方法,在該方法中會向 reactor 註冊 channel 感興趣的 IO 事件。對於 NioServerSocketChannel 來說這裡註冊的就是 OP_ACCEPT 事件,對於 NioSocketChannel 來說這裡註冊的則是 OP_READ 事件。

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();

        if ((interestOps & readInterestOp) == 0) {
            //註冊監聽OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

細心的同學可能註意到了 channel 對應的配置類中包含了一個 autoRead 屬性,那麼這個 autoRead 到底是乾什麼的呢?

其實這是 netty 為大家提供的一種背壓機制,用來防止 OOM ,想象一下當對端發送數據非常多並且發送速度非常快,而服務端處理速度非常慢,一時間消費不過來。而對端又在不停的大量發送數據,服務端的 reactor 線程不得不在 read loop 中不停的讀取,並且為讀取到的數據分配 ByteBuffer 。而服務端業務線程又處理不過來,這就導致了大量來不及處理的數據占用了大量的記憶體空間,從而導致 OOM 。

面對這種情況,我們可以通過 channelHandlerContext.channel().config().setAutoRead(false) 將 autoRead 屬性設置為 false 。隨後 netty 就會將 channel 中感興趣的讀類型事件從 reactor 中註銷,從此 reactor 不會再對相應事件進行監聽。這樣 channel 就不會在讀取數據了。

這裡 NioServerSocketChannel 對應的是 OP_ACCEPT 事件, NioSocketChannel 對應的是 OP_READ 事件。

        protected final void removeReadOp() {
            SelectionKey key = selectionKey();
            if (!key.isValid()) {
                return;
            }
            int interestOps = key.interestOps();
            if ((interestOps & readInterestOp) != 0) {        
                key.interestOps(interestOps & ~readInterestOp);
            }
        }

而當服務端的處理速度恢復正常,我們又可以通過 channelHandlerContext.channel().config().setAutoRead(true) 將 autoRead 屬性設置為 true 。這樣 netty 會在 pipeline 中觸發 read 事件,最終在 HeadContext 中的 read 事件回調方法中通過調用 unsafe#beginRead 方法將 channel 感興趣的讀類型事件重新註冊到對應的 reactor 中。

    @Override
    public ChannelConfig setAutoRead(boolean autoRead) {
        boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
        if (autoRead && !oldAutoRead) {
            //autoRead從false變為true
            channel.read();
        } else if (!autoRead && oldAutoRead) {
            //autoRead從true變為false
            autoReadCleared();
        }
        return this;
    }

read 事件可以理解為使 channel 擁有讀的能力,當有了讀的能力後, channelRead 就可以讀取具體的數據了。

3.2.2 write 和 flush 事件

write 事件和 flush 事件我們在《一文搞懂Netty發送數據全流程》一文中已經非常詳盡的介紹過了,這裡筆者在帶大家簡單回顧一下。

write 事件和 flush 事件均由用戶在處理完業務請求得到業務結果後在業務線程中主動觸發。

用戶既可以通過 ChannelHandlerContext 觸發也可以通過 Channel 來觸發。

不同之處在於如果通過 ChannelHandlerContext 觸發,那麼 write 事件或者 flush 事件就會在 pipeline 中從當前 ChannelHandler 開始一直向前傳播直到 HeadContext 。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
       ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

如果通過 Channel 觸發,那麼 write 事件和 flush 事件就會從 pipeline 的尾部節點 TailContext 開始一直向前傳播直到 HeadContext 。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
       ctx.channel().write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.channel().flush();
    }

當然還有一個 writeAndFlush 方法,也會分為 ChannelHandlerContext 觸發和 Channel 的觸發。觸發 writeAndFlush 後,write 事件首先會在 pipeline 中傳播,最後 flush 事件在 pipeline 中傳播。

netty 對 write 事件的處理最終會將發送數據寫入 Channel 對應的寫緩衝隊列 ChannelOutboundBuffer 中。此時數據並沒有發送出去而是在寫緩衝隊列中緩存,這也是 netty 實現非同步寫的核心設計。

最終通過 flush 操作從 Channel 中的寫緩衝隊列 ChannelOutboundBuffer 中獲取到待發送數據,並寫入到 Socket 的發送緩衝區中。

3.2.3 close 事件

當用戶在 ChannelHandler 中調用如下方法對 Channel 進行關閉時,會觸發 Close 事件在 pipeline 中從後向前傳播。

//close事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.close();
//close事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().close();

我們可以在Outbound類型的ChannelHandler中響應close事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        
        .....客戶端channel關閉之前的處理回調.....
        
        //繼續向前傳播close事件
        super.close(ctx, promise);
    }
}

最終 close 事件會在 pipeline 中一直向前傳播直到頭結點 HeadConnect 中,併在 HeadContext 中完成連接關閉的操作,當連接完成關閉之後,會在 pipeline中先後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。

3.2.4 deRegister 事件

用戶可調用如下代碼將當前 Channel 從 Reactor 中註銷掉。

//deregister事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.deregister();
//deregister事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().deregister();

我們可以在 Outbound 類型的 ChannelHandler 中響應 deregister 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {


        .....客戶端channel取消註冊之前的處理回調.....

        //繼續向前傳播connect事件
        super.deregister(ctx, promise);
    }
}

最終 deRegister 事件會傳播至 pipeline 中的頭結點 HeadContext 中,併在 HeadContext 中完成底層 channel 取消註冊的操作。當 Channel 從 Reactor 上註銷之後,從此 Reactor 將不會在監聽 Channel 上的 IO 事件,並觸發 ChannelUnregistered 事件在 pipeline 中傳播。

3.2.5 connect 事件

在 Netty 的客戶端中我們可以利用 NioSocketChannel 的 connect 方法觸發 connect 事件在 pipeline 中傳播。

//connect事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.connect(remoteAddress);
//connect事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().connect(remoteAddress);

我們可以在 Outbound 類型的 ChannelHandler 中響應 connect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
                        ChannelPromise promise) throws Exception {
                 
        
        .....客戶端channel連接成功之前的處理回調.....
        
        //繼續向前傳播connect事件
        super.connect(ctx, remoteAddress, localAddress, promise);
    }
}

最終 connect 事件會在 pipeline 中的頭結點 headContext 中觸發底層的連接建立請求。當客戶端成功連接到服務端之後,會在客戶端 NioSocketChannel 的 pipeline 中傳播 channelActive 事件。

3.2.6 disConnect 事件

在 Netty 的客戶端中我們也可以調用 NioSocketChannel 的 disconnect 方法在 pipeline 中觸發 disconnect 事件,這會導致 NioSocketChannel 的關閉。

//disconnect事件從當前ChannelHandlerContext開始在pipeline中向前傳播
ctx.disconnect();
//disconnect事件從pipeline的尾結點tailContext開始向前傳播
ctx.channel().disconnect();

我們可以在 Outbound 類型的 ChannelHandler 中響應 disconnect 事件。

public class ExampleChannelHandler extends ChannelOutboundHandlerAdapter {


    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        
        .....客戶端channel即將關閉前的處理回調.....
        
        //繼續向前傳播disconnect事件
        super.disconnect(ctx, promise);
    }
}

最終 disconnect 事件會傳播到 HeadContext 中,併在 HeadContext 中完成底層的斷開連接操作,當客戶端斷開連接成功關閉之後,會在 pipeline 中先後觸發 ChannelInactive 事件和 ChannelUnregistered 事件。

4. 向pipeline添加channelHandler

在我們詳細介紹了全部的 inbound 類事件和 outbound 類事件的掩碼表示以及事件的觸發和傳播路徑後,相信大家現在可以通過 ChannelInboundHandler 和 ChannelOutboundHandler 來根據具體的業務場景選擇合適的 ChannelHandler 類型以及監聽合適的事件來完成業務需求了。

本小節就該介紹一下自定義的 ChannelHandler 是如何添加到 pipeline 中的,netty 在這個過程中幫我們作了哪些工作?

           final EchoServerHandler serverHandler = new EchoServerHandler();

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)

             .............

             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();          
                     p.addLast(serverHandler);

                     ......可添加多個channelHandler......
                 }
             });

以上是筆者簡化的一個 netty 服務端配置 ServerBootstrap 啟動類的一段示例代碼。我們可以看到再向 channel 對應的 pipeline 中添加 ChannelHandler 是通過 ChannelPipeline#addLast 方法將指定 ChannelHandler 添加到 pipeline 的末尾處。

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    //向pipeline的末尾處批量添加多個channelHandler
    ChannelPipeline addLast(ChannelHandler... handlers);

    //指定channelHandler的executor,由指定的executor執行channelHandler中的回調方法
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

     //為channelHandler指定名稱
    ChannelPipeline addLast(String name, ChannelHandler handler);

    //為channelHandler指定executor和name
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
}
public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }
}

最終 addLast 的這些重載方法都會調用到 DefaultChannelPipeline#addLast(EventExecutorGroup, String, ChannelHandler) 這個方法從而完成 ChannelHandler 的添加。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //檢查同一個channelHandler實例是否允許被重覆添加
            checkMultiplicity(handler);

            //創建channelHandlerContext包裹channelHandler並封裝執行傳播事件相關的上下文信息
            newCtx = newContext(group, filterName(name, handler), handler);

            //將channelHandelrContext插入到pipeline中的末尾處。雙向鏈表操作
            //此時channelHandler的狀態還是ADD_PENDING,只有當channelHandler的handlerAdded方法被回調後,狀態才會為ADD_COMPLETE
            addLast0(newCtx);

            //如果當前channel還沒有向reactor註冊,則將handlerAdded方法的回調添加進pipeline的任務隊列中
            if (!registered) {
                //這裡主要是用來處理ChannelInitializer的情況
                //設置channelHandler的狀態為ADD_PENDING 即等待添加,當狀態變為ADD_COMPLETE時 channelHandler中的handlerAdded會被回調
                newCtx.setAddPending();
                //向pipeline中添加PendingHandlerAddedTask任務,在任務中回調handlerAdded
                //當channel註冊到reactor後,pipeline中的pendingHandlerCallbackHead任務鏈表會被挨個執行
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            //如果當前channel已經向reactor註冊成功,那麼就直接回調channelHandler中的handlerAddded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                //這裡需要確保channelHandler中handlerAdded方法的回調是在channel指定的executor中
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        //回調channelHandler中的handlerAddded方法
        callHandlerAdded0(newCtx);
        return this;
    }

這個方法的邏輯還是比較複雜的,涉及到很多細節,為了清晰地為大家講述,筆者這裡還是採用總分總的結構,先描述該方法的總體邏輯,然後在針對核心細節要點展開細節分析。

因為向 pipeline 中添加 channelHandler 的操作可能會在多個線程中進行,所以為了確保添加操作的線程安全性,這裡採用一個 synchronized 語句塊將整個添加邏輯包裹起來。

  1. 通過 checkMultiplicity 檢查被添加的 ChannelHandler 是否是共用的(標註 @Sharable 註解),如果不是共用的那麼則不會允許
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在微服務架構下,我們習慣使用多機器、分散式存儲、緩存去支持一個高併發的請求模型,而忽略了單機高併發模型是如何工作的。這篇文章通過解構客戶端與服務端的建立連接和數據傳輸過程,闡述下如何進行單機高併發模型設計。 ...
  • 在項目中,有些代碼需要被各個模塊調用。為瞭解耦,可以把這些公共部分的代碼整合到一個子項目中,併發布到本地,實現多個項目共用代碼。 新建子項目。 maven -clean 命令,測試maven環境;測試通過後使用maven -install 將項目打包併發布到本地倉庫。 在其他項目中,通過<depen ...
  • SpringBoot開發 - 如何定製自己的Banner?還能用圖片? 我們在啟動Spring Boot程式時,有SpringBoot的Banner信息,那麼如何自定義成自己項目的信息呢? @pdai SpringBoot開發 - 如何定製自己的Banner?還能用圖片? 什麼是Banner 如何更 ...
  • Java面向對象(二) 五、方法 5.1 方法的重載(overload) 定義:在同一個類中,允許定義多個相同名字的方法,只要參數列表(參數類型或者參數個數)是不同的。 判斷是否為方法重載: (1)同一個類,同樣的方法名,不同的參數列表! (2)與方法修飾符、方法返回值、形參名、方法體無關! pub ...
  • Reentrant 2 前兩篇寫完了後我自己研究了下,還有有很多疑惑和問題,這篇就繼續以自問自答的方式寫 如果沒看過第1篇的可以先看看那個https://www.cnblogs.com/sunankang/p/16458795.html public final void acquire(int a ...
  • Aspose簡介 Aspose.Total是Aspose公司旗下全套文件格式處理解決方案,提供最完整、最高效的文檔處理解決方案集,無需任何其他軟體安裝和依賴。主要提供.net、java、C++d三個開發語言的工具包,通過它,可以對辦公文檔格式的轉換和文檔內容的線上編輯,如:Word, Excel, ...
  • Java面向對象(一) 一、面向過程(POP)與面向對象(OOP) 二者都是一種思想,面向對象是相對於面向過程而言的。面向過程,強調的是功能行為,以函數為最小單位,考慮怎麼做。面向對象,將功能封裝進對 象,強調具備了功能的對象,以類/對象為最小單位,考慮誰來做。 面向對象的三大特征: 封裝 繼承 多 ...
  • 稀疏組織 當一個數組中大部分元素為0,或者為同一個值的數組時,可以用稀疏數組來保存該數組 稀疏數組,記錄一共有幾行幾列,有多少個不同值 把具有不同值的元素和行里了及值記錄在一個小規模的數組中,從而縮小程式的規模! 我們定義一下原始數組: 原始數組如下: 0 0 3 0 0 0 0 0 0 4 0 0 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...