Netty 如何高效接收網路數據?一文聊透 ByteBuffer 動態自適應擴縮容機制

来源:https://www.cnblogs.com/binlovetech/archive/2022/07/05/16448527.html
-Advertisement-
Play Games

本系列Netty源碼解析文章基於 4.1.56.Final版本,公眾號:bin的技術小屋 前文回顧 在前邊的系列文章中,我們從內核如何收髮網絡數據開始以一個C10K的問題作為主線詳細從內核角度闡述了網路IO模型的演變,最終在此基礎上引出了Netty的網路IO模型如下圖所示: 詳細內容可回看《從內核角 ...


本系列Netty源碼解析文章基於 4.1.56.Final版本,公眾號:bin的技術小屋

本文概覽.png

前文回顧

在前邊的系列文章中,我們從內核如何收髮網絡數據開始以一個C10K的問題作為主線詳細從內核角度闡述了網路IO模型的演變,最終在此基礎上引出了Netty的網路IO模型如下圖所示:

netty中的reactor.png

詳細內容可回看《從內核角度看IO模型的演變》

後續我們又圍繞著Netty的主從Reactor網路IO線程模型,在《Reactor模型在Netty中的實現》一文中詳細闡述了Netty的主從Reactor模型的創建,以及介紹了Reactor模型的關鍵組件。搭建了Netty的核心骨架如下圖所示:

主從Reactor線程組.png

在核心骨架搭建完畢之後,我們隨後又在《詳細圖解Reactor啟動全流程》一文中闡述了Reactor啟動的全流程,一個非常重要的核心組件NioServerSocketChannel開始在這裡初次亮相,承擔著一個網路框架最重要的任務--高效接收網路連接。我們介紹了NioServerSocketChannel的創建,初始化,向Main Reactor註冊並監聽OP_ACCEPT事件的整個流程。在此基礎上,Netty得以整裝待發,枕戈待旦開始迎接海量的客戶端連接。

Reactor啟動後的結構.png

隨後緊接著我們在《Netty如何高效接收網路連接》一文中詳細介紹了Netty高效接收客戶端網路連接的全流程,在這裡Netty的核心重要組件NioServerSocketChannel開始正是登場,在NioServerSocketChannel中我們創建了客戶端連接NioSocketChannel,並詳細介紹了NioSocketChannel的初始化過程,隨後通過在NioServerSocketChannel的pipeline中觸發ChannelRead事件,並最終在ServerBootstrapAcceptor中將客戶端連接NioSocketChannel註冊到Sub Reactor中開始監聽客戶端連接上的OP_READ事件,準備接收客戶端發送的網路數據也就是本文的主題內容。

image.png

自此Netty的核心組件全部就緒並啟動完畢,開始起飛~~~

主從Reactor組完整結構.png

之前文章中的主角是Netty中主Reactor組中的Main Reactor以及註冊在Main Reactor上邊的NioServerSocketChannel,那麼從本文開始,我們文章中的主角就切換為Sub Reactor以及註冊在SubReactor上的NioSocketChannel了。

下麵就讓我們正式進入今天的主題,看一下Netty是如何處理OP_READ事件以及如何高效接收網路數據的。

1. Sub Reactor處理OP_READ事件流程總覽

OP_READ事件處理.png

客戶端發起系統IO調用向服務端發送數據之後,當網路數據到達服務端的網卡並經過內核協議棧的處理,最終數據到達Socket的接收緩衝區之後,Sub Reactor輪詢到NioSocketChannel上的OP_READ事件就緒,隨後Sub Reactor線程就會從JDK Selector上的阻塞輪詢APIselector.select(timeoutMillis)調用中返回。轉而去處理NioSocketChannel上的OP_READ事件

註意這裡的Reactor為負責處理客戶端連接的Sub Reactor。連接的類型為NioSocketChannel,處理的事件為OP_READ事件。

在之前的文章中筆者已經多次強調過了,Reactor在處理Channel上的IO事件入口函數為NioEventLoop#processSelectedKey

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ..............省略.................

        try {
            int readyOps = k.readyOps();

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
               ..............處理OP_CONNECT事件.................
            }


            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              ..............處理OP_WRITE事件.................
            }


            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //本文重點處理OP_ACCEPT事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

}

這裡需要重點強調的是,當前的執行線程現在已經變成了Sub Reactor,而Sub Reactor上註冊的正是netty客戶端NioSocketChannel負責處理連接上的讀寫事件。

所以這裡入口函數的參數AbstractNioChannel ch則是IO就緒的客戶端連接NioSocketChannel

開頭通過ch.unsafe()獲取到的NioUnsafe操作類正是NioSocketChannel中對底層JDK NIO SocketChannel的Unsafe底層操作類。實現類型為NioByteUnsafe定義在下圖繼承結構中的AbstractNioByteChannel父類中。

image.png

下麵我們到NioByteUnsafe#read方法中來看下Netty對OP_READ事件的具體處理過程:

2. Netty接收網路數據流程總覽

我們直接按照老規矩,先從整體上把整個OP_READ事件的邏輯處理框架提取出來,讓大家先總體俯視下流程全貌,然後在針對每個核心點位進行各個擊破。

Netty接收網路數據流程.png

流程中相關置灰的步驟為Netty處理連接關閉時的邏輯,和本文主旨無關,我們這裡暫時忽略,等後續筆者介紹連接關閉時,會單獨開一篇文章詳細為大家介紹。

從上面這張Netty接收網路數據總體流程圖可以看出NioSocketChannel在接收網路數據的整個流程和我們在上篇文章《Netty如何高效接收網路連接》中介紹的NioServerSocketChannel在接收客戶端連接時的流程在總體框架上是一樣的。

NioSocketChannel在接收網路數據的過程處理中,也是通過在一個do{....}while(...)迴圈read loop中不斷的迴圈讀取連接NioSocketChannel上的數據。

同樣在NioSocketChannel讀取連接數據的read loop中也是受最大讀取次數的限制。預設配置最多只能讀取16次,超過16次無論此時NioSocketChannel中是否還有數據可讀都不能在進行讀取了。

這裡read loop迴圈最大讀取次數可在啟動配置類ServerBootstrap中通過ChannelOption.MAX_MESSAGES_PER_READ選項設置,預設為16。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定義次數)

Netty這裡為什麼非得限制read loop的最大讀取次數呢?為什麼不在read loop中一次性把數據讀取完呢?

這時候就是考驗我們大局觀的時候了,在前邊的文章介紹中我們提到Netty的IO模型為主從Reactor線程組模型,在Sub Reactor Group中包含了多個Sub Reactor專門用於監聽處理客戶端連接上的IO事件。

為了能夠高效有序的處理全量客戶端連接上的讀寫事件,Netty將服務端承載的全量客戶端連接分攤到多個Sub Reactor中處理,同時也能保證Channel上IO處理的線程安全性

其中一個Channel只能分配給一個固定的Reactor。一個Reactor負責處理多個Channel上的IO就緒事件,Reactor與Channel之間的對應關係如下圖所示:

image.png

而一個Sub Reactor上註冊了多個NioSocketChannel,Netty不可能在一個NioSocketChannel上無限制的處理下去,要將讀取數據的機會均勻分攤給其他NioSocketChannel,所以需要限定每個NioSocketChannel上的最大讀取次數。

此外,Sub Reactor除了需要監聽處理所有註冊在它上邊的NioSocketChannel中的IO就緒事件之外,還需要騰出事件來處理有用戶線程提交過來的非同步任務。從這一點看,Netty也不會一直停留在NioSocketChannel的IO處理上。所以限制read loop的最大讀取次數是非常必要的。

關於Reactor的整體運轉架構,對細節部分感興趣的同學可以回看下筆者的《一文聊透Netty核心引擎Reactor的運轉架構》這篇文章。

所以基於這個原因,我們需要在read loop迴圈中,每當通過doReadBytes方法從NioSocketChannel中讀取到數據時(方法返回值會大於0,並記錄在allocHandle.lastBytesRead中),都需要通過allocHandle.incMessagesRead(1)方法統計已經讀取的次數。當達到16次時不管NioSocketChannel是否還有數據可讀,都需要在read loop末尾退出迴圈。轉去執行Sub Reactor上的非同步任務。以及其他NioSocketChannel上的IO就緒事件。平均分配,雨露均沾!!

public abstract class MaxMessageHandle implements ExtendedHandle {

        //read loop總共讀取了多少次
        private int totalMessages;

       @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }

}

本次read loop讀取到的數據大小會記錄在allocHandle.lastBytesRead

public abstract class MaxMessageHandle implements ExtendedHandle {

         //本次read loop讀取到的位元組數
        private int lastBytesRead;
        //整個read loop迴圈總共讀取的位元組數
        private int totalBytesRead;

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }
}
  • lastBytesRead < 0:表示客戶端主動發起了連接關閉流程,Netty開始連接關閉處理流程。這個和本文的主旨無關,我們先不用管。後面筆者會專門用一篇文章來詳解關閉流程。

  • lastBytesRead = 0:表示當前NioSocketChannel上的數據已經全部讀取完畢,沒有數據可讀了。本次OP_READ事件圓滿處理完畢,可以開開心心的退出read loop。

  • lastBytesRead > 0:表示在本次read loop中從NioSocketChannel中讀取到了數據,會在NioSocketChannel的pipeline中觸發ChannelRead事件。進而在pipeline中負責IO處理的ChannelHandelr中響應,處理網路請求。

fireChannelRread.png

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
          .......處理網路請求,比如解碼,反序列化等操作.......
    }
}

最後會在read loop迴圈的末尾調用allocHandle.continueReading()判斷是否結束本次read loop迴圈。這裡的結束迴圈條件的判斷會比我們在介紹NioServerSocketChannel接收連接時的判斷條件複雜很多,筆者會將這個判斷條件的詳細解析放在文章後面細節部分為大家解讀,這裡大家只需要把握總體核心流程,不需要關註太多細節。

總體上在NioSocketChannel中讀取網路數據的read loop迴圈結束條件需要滿足以下幾點:

  • 當前NioSocketChannel中的數據已經全部讀取完畢,則退出迴圈。

  • 本輪read loop如果沒有讀到任何數據,則退出迴圈。

  • read loop的讀取次數達到16次,退出迴圈。

當滿足這裡的read loop退出條件之後,Sub Reactor線程就會退出迴圈,隨後會調用allocHandle.readComplete()方法根據本輪read loop總共讀取到的位元組數totalBytesRead來決定是否對用於接收下一輪OP_READ事件數據的ByteBuffer進行擴容或者縮容。

最後在NioSocketChannel的pipeline中觸發ChannelReadComplete事件,通知ChannelHandler本次OP_READ事件已經處理完畢。

fireChannelReadComplete.png


public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
       .......處理網路請求,比如解碼,反序列化等操作.......
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ......本次OP_READ事件處理完畢.......
        ......決定是否向客戶端響應處理結果......
    }
}

2.1 ChannelRead與ChannelReadComplete事件的區別

有些小伙伴可能對Netty中的一些傳播事件觸發的時機,或者事件之間的區別理解的不是很清楚,概念容易混淆。在後面的文章中筆者也會從源碼的角度出發給大家說清楚Netty中定義的所有非同步事件,以及這些事件之間的區別和聯繫和觸發時機,傳播機制。

這裡我們主要探討本文主題中涉及到的兩個事件:ChannelRead事件與ChannelReadComplete事件。

從上述介紹的Netty接收網路數據流程總覽中我們可以看出ChannelRead事件ChannelReadComplete事件是不一樣的,但是對於剛接觸Netty的小伙伴來說從命名上乍一看感覺又差不多。

下麵我們來看這兩個事件之間的差別:

Netty服務端對於一次OP_READ事件的處理,會在一個do{}while()迴圈read loop中分多次從客戶端NioSocketChannel中讀取網路數據。每次讀取我們分配的ByteBuffer容量大小,初始容量為2048。

  • ChanneRead事件:一次迴圈讀取一次數據,就觸發一次ChannelRead事件。本次最多讀取在read loop迴圈開始分配的DirectByteBuffer容量大小。這個容量會動態調整,文章後續筆者會詳細介紹。

  • ChannelReadComplete事件:當讀取不到數據或者不滿足continueReading 的任意一個條件就會退出read loop,這時就會觸發ChannelReadComplete事件。表示本次OP_READ事件處理完畢。

這裡需要特別註意下觸發ChannelReadComplete事件並不代表NioSocketChannel中的數據已經讀取完了,只能說明本次OP_READ事件處理完畢。因為有可能是客戶端發送的數據太多,Netty讀了16次還沒讀完,那就只能等到下次OP_READ事件到來的時候在進行讀取了。


以上內容就是Netty在接收客戶端發送網路數據的全部核心邏輯。目前為止我們還未涉及到這部分的主幹核心源碼,筆者想的是先給大家把核心邏輯講解清楚之後,這樣理解起來核心主幹源碼會更加清晰透徹。

經過前邊對網路數據接收的核心邏輯介紹,筆者在把這張流程圖放出來,大家可以結合這張圖在來回想下主幹核心邏輯。

Netty接收網路數據流程.png

下麵筆者會結合這張流程圖,給大家把這部分的核心主幹源碼框架展現出來,大家可以將我們介紹過的核心邏輯與主幹源碼做個一一對應,還是那句老話,我們要從主幹框架層面把握整體處理流程,不需要讀懂每一行代碼,文章後續筆者會將這個過程中涉及到的核心點位給大家拆開來各個擊破!!

image.png

3. 源碼核心框架總覽

        @Override
        public final void read() {
            final ChannelConfig config = config();

            ...............處理半關閉相關代碼省略...............
            //獲取NioSocketChannel的pipeline
            final ChannelPipeline pipeline = pipeline();
            //PooledByteBufAllocator 具體用於實際分配ByteBuf的分配器
            final ByteBufAllocator allocator = config.getAllocator();
            //自適應ByteBuf分配器 AdaptiveRecvByteBufAllocator ,用於動態調節ByteBuf容量
            //需要與具體的ByteBuf分配器配合使用 比如這裡的PooledByteBufAllocator
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //allocHandler用於統計每次讀取數據的大小,方便下次分配合適大小的ByteBuf
            //重置清除上次的統計指標
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    //利用PooledByteBufAllocator分配合適大小的byteBuf 初始大小為2048
                    byteBuf = allocHandle.allocate(allocator);
                    //記錄本次讀取了多少位元組數
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果本次沒有讀取到任何位元組,則退出迴圈 進行下一輪事件輪詢
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            ......表示客戶端發起連接關閉.....
                        }
                        break;
                    }

                    //read loop讀取數據次數+1
                    allocHandle.incMessagesRead(1);
                    //客戶端NioSocketChannel的pipeline中觸發ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
                    //解除本次讀取數據分配的ByteBuffer引用,方便下一輪read loop分配
                    byteBuf = null;
                } while (allocHandle.continueReading());//判斷是否應該繼續read loop

                //根據本次read loop總共讀取的位元組數,決定下次是否擴容或者縮容
                allocHandle.readComplete();
                //在NioSocketChannel的pipeline中觸發ChannelReadComplete事件,表示一次read事件處理完畢
                //但這並不表示 客戶端發送來的數據已經全部讀完,因為如果數據太多的話,這裡只會讀取16次,剩下的會等到下次read事件到來後在處理
                pipeline.fireChannelReadComplete();

                .........省略連接關閉流程處理.........
            } catch (Throwable t) {
                ...............省略...............
            } finally {
               ...............省略...............
            }
        }
    }

這裡再次強調下當前執行線程為Sub Reactor線程,處理連接數據讀取邏輯是在NioSocketChannel中。

首先通過config()獲取客戶端NioSocketChannel的Channel配置類NioSocketChannelConfig。

通過pipeline()獲取NioSocketChannel的pipeline。我們在《詳細圖解Netty Reactor啟動全流程》一文中提到的Netty服務端模板所舉的示例中,NioSocketChannelde pipeline中只有一個EchoChannelHandler。

客戶端channel pipeline結構.png

3.1 分配DirectByteBuffer接收網路數據

Sub Reactor在接收NioSocketChannel上的IO數據時,都會分配一個ByteBuffer用來存放接收到的IO數據。

這裡大家可能覺得比較奇怪,為什麼在NioSocketChannel接收數據這裡會有兩個ByteBuffer分配器呢?一個是ByteBufAllocator,另一個是RecvByteBufAllocator。

    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

這兩個ByteBuffer又有什麼區別和聯繫呢?

在上篇文章《抓到Netty一個Bug,順帶來透徹地聊一下Netty是如何高效接收網路連接》中,筆者為了闡述上篇文章中提到的Netty在接收網路連接時的Bug時,簡單和大家介紹了下這個RecvByteBufAllocator。

在上篇文章提到的NioServerSocketChannelConfig中,這裡的RecvByteBufAllocator類型為ServerChannelRecvByteBufAllocator。

image.png

還記得這個ServerChannelRecvByteBufAllocator類型在4.1.69.final版本引入是為瞭解決筆者在上篇文章中提到的那個Bug嗎?在4.1.69.final版本之前,NioServerSocketChannelConfig中的RecvByteBufAllocator類型為AdaptiveRecvByteBufAllocator。

而在本文中NioSocketChannelConfig中的RecvByteBufAllocator類型為AdaptiveRecvByteBufAllocator。

image.png

所以這裡recvBufAllocHandle()獲得到的RecvByteBufAllocator為AdaptiveRecvByteBufAllocator。顧名思義,這個類型的RecvByteBufAllocator可以根據NioSocketChannel上每次到來的IO數據大小來自適應動態調整ByteBuffer的容量。

對於客戶端NioSocketChannel來說,它裡邊包含的IO數據時客戶端發送來的網路數據,長度是不定的,所以才會需要這樣一個可以根據每次IO數據的大小來自適應動態調整容量的ByteBuffer來接收。

如果我們把用於接收數據用的ByteBuffer看做一個桶的話,那麼小數據用大桶裝或者大數據用小桶裝肯定是不合適的,所以我們需要根據接收數據的大小來動態調整桶的容量。而AdaptiveRecvByteBufAllocator的作用正是用來根據每次接收數據的容量大小來動態調整ByteBuffer的容量的。

現在RecvByteBufAllocator筆者為大家解釋清楚了,接下來我們繼續看ByteBufAllocator。

大家這裡需要註意的是AdaptiveRecvByteBufAllocator並不會真正的去分配ByteBuffer,它只是負責動態調整分配ByteBuffer的大小。

而真正具體執行記憶體分配動作的是這裡的ByteBufAllocator類型為PooledByteBufAllocator。它會根據AdaptiveRecvByteBufAllocator動態調整齣來的大小去真正的申請記憶體分配ByteBuffer。

PooledByteBufAllocator為Netty中的記憶體池,用來管理堆外記憶體DirectByteBuffer。

AdaptiveRecvByteBufAllocator中的allocHandle在上篇文章中我們也介紹過了,它的實際類型為MaxMessageHandle。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {
                  .................省略................
    }
}

在MaxMessageHandle中包含了用於動態調整ByteBuffer容量的統計指標。

   public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;

        //用於控制每次read loop里最大可以迴圈讀取的次數,預設為16次
        //可在啟動配置類ServerBootstrap中通過ChannelOption.MAX_MESSAGES_PER_READ選項設置。
        private int maxMessagePerRead;

        //用於統計read loop中總共接收的連接個數,NioSocketChannel中表示讀取數據的次數
        //每次read loop迴圈後會調用allocHandle.incMessagesRead增加記錄接收到的連接個數
        private int totalMessages;

        //用於統計在read loop中總共接收到客戶端連接上的數據大小
        private int totalBytesRead;

        //表示本次read loop 嘗試讀取多少位元組,byteBuffer剩餘可寫的位元組數
        private int attemptedBytesRead;

        //本次read loop讀取到的位元組數
        private int lastBytesRead;
        
        //預計下一次分配buffer的容量,初始:2048
        private int nextReceiveBufferSize;
        ...........省略.............
}

在每輪read loop開始之前,都會調用allocHandle.reset(config)重置清空上一輪read loop的統計指標。

        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            //預設每次最多讀取16次
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

在每次開始從NioSocketChannel中讀取數據之前,需要利用PooledByteBufAllocator在記憶體池中為ByteBuffer分配記憶體,預設初始化大小為2048,這個容量由guess()方法決定。

        byteBuf = allocHandle.allocate(allocator);
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }

        @Override
        public int guess() {
            //預計下一次分配buffer的容量,一開始為2048
            return nextReceiveBufferSize;
        }

在每次通過doReadBytes從NioSocketChannel中讀取到數據後,都會調用allocHandle.lastBytesRead(doReadBytes(byteBuf))記錄本次讀取了多少位元組數據,並統計本輪read loop目前總共讀取了多少位元組。

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

每次迴圈從NioSocketChannel中讀取數據之後,都會調用allocHandle.incMessagesRead(1)。統計當前已經讀取了多少次。如果超過了最大讀取限制此時16次,就需要退出read loop。去處理其他NioSocketChannel上的IO事件。

        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }

在每次read loop迴圈的末尾都需要通過調用allocHandle.continueReading()來判斷是否繼續read loop迴圈讀取NioSocketChannel中的數據。

        @Override
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }

        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                //判斷本次讀取byteBuffer是否滿載而歸
                return attemptedBytesRead == lastBytesRead;
            }
        };

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }
  • attemptedBytesRead :表示當前ByteBuffer預計嘗試要寫入的位元組數。
  • lastBytesRead :表示本次read loop真實讀取到了多少個位元組。

defaultMaybeMoreSupplier 用於判斷經過本次read loop讀取數據後,ByteBuffer是否滿載而歸。如果是滿載而歸的話(attemptedBytesRead == lastBytesRead),表明可能NioSocketChannel里還有數據。如果不是滿載而歸,表明NioSocketChannel里沒有數據了已經。

是否繼續進行read loop需要同時滿足以下幾個條件:

  • totalMessages < maxMessagePerRead 當前讀取次數是否已經超過16次,如果超過,就退出do(...)while()迴圈。進行下一輪OP_READ事件的輪詢。因為每個Sub Reactor管理了多個NioSocketChannel,不能在一個NioSocketChannel上占用太多時間,要將機會均勻地分配給Sub Reactor所管理的所有NioSocketChannel。

  • totalBytesRead > 0 本次OP_READ事件處理是否讀取到了數據,如果已經沒有數據可讀了,那麼就直接退出read loop。

  • !respectMaybeMoreData || maybeMoreDataSupplier.get() 這個條件比較複雜,它其實就是通過respectMaybeMoreData欄位來控制NioSocketChannel中可能還有數據可讀的情況下該如何處理。

    • maybeMoreDataSupplier.get():true表示本次讀取從NioSocketChannel中讀取數據,ByteBuffer滿載而歸。說明可能NioSocketChannel中還有數據沒讀完。fasle表示ByteBuffer還沒有裝滿,說明NioSocketChannel中已經沒有數據可讀了。
    • respectMaybeMoreData = true表示要對可能還有更多數據進行處理的這種情況要respect認真對待,如果本次迴圈讀取到的數據已經裝滿ByteBuffer,表示後面可能還有數據,那麼就要進行讀取。如果ByteBuffer還沒裝滿表示已經沒有數據可讀了那麼就退出迴圈。
      image.png
    • respectMaybeMoreData = false表示對可能還有更多數據的這種情況不認真對待 not respect。不管本次迴圈讀取數據ByteBuffer是否滿載而歸,都要繼續進行讀取,直到讀取不到數據在退出迴圈,屬於無腦讀取。

同時滿足以上三個條件,那麼read loop繼續進行。繼續從NioSocketChannel中讀取數據,直到讀取不到或者不滿足三個條件中的任意一個為止。

3.2 從NioSocketChannel中讀取數據

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());    
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }
}

這裡會直接調用底層JDK NIO的SocketChannel#read方法將數據讀取到DirectByteBuffer中。讀取數據大小為本次分配的DirectByteBuffer容量,初始為2048。

4. ByteBuffer動態自適應擴縮容機制

由於我們一開始並不知道客戶端會發送多大的網路數據,所以這裡先利用PooledByteBufAllocator分配一個初始容量為2048的DirectByteBuffer用於接收數據。

  byteBuf = allocHandle.allocate(allocator);

這就好比我們需要拿著一個桶去排隊裝水,但是第一次去裝的時候,我們並不知道管理員會給我們分配多少水,桶拿大了也不合適拿小了也不合適,於是我們就先預估一個差不多容量大小的桶,如果分配的多了,我們下次就拿更大一點的桶,如果分配少了,下次我們就拿一個小點的桶。

在這種場景下,我們需要ByteBuffer可以自動根據每次網路數據的大小來動態自適應調整自己的容量。

而ByteBuffer動態自適應擴縮容機制依賴於AdaptiveRecvByteBufAllocator類的實現。讓我們先回到AdaptiveRecvByteBufAllocator類的創建起點開始說起~~

4.1 AdaptiveRecvByteBufAllocator的創建

在前文《Netty是如何高效接收網路連接》中我們提到,當Main Reactor監聽到OP_ACCPET事件活躍後,會在NioServerSocketChannel中accept完成三次握手的客戶端連接。並創建NioSocketChannel,伴隨著NioSocketChannel的創建其對應的配置類NioSocketChannelConfig類也會隨之創建。

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

最終會在NioSocketChannelConfig的父類DefaultChannelConfig的構造器中創建AdaptiveRecvByteBufAllocator 。並保存在RecvByteBufAllocator rcvBufAllocator欄位中。

public class DefaultChannelConfig implements ChannelConfig {

    //用於Channel接收數據用的buffer分配器  AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;

    public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

}

new AdaptiveRecvByteBufAllocator()創建AdaptiveRecvByteBufAllocator類實例的時候會先觸發AdaptiveRecvByteBufAllocator類的初始化。

我們先來看下AdaptiveRecvByteBufAllocator類的初始化都做了些什麼事情:

4.2 AdaptiveRecvByteBufAllocator類的初始化

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    //擴容步長
    private static final int INDEX_INCREMENT = 4;
    //縮容步長
    private static final int INDEX_DECREMENT = 1;

    //RecvBuf分配容量表(擴縮容索引表)按照表中記錄的容量大小進行擴縮容
    private static final int[] SIZE_TABLE;

   static {
        //初始化RecvBuf容量分配表
        List<Integer> sizeTable = new ArrayList<Integer>();
        //當分配容量小於512時,擴容單位為16遞增
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        //當分配容量大於512時,擴容單位為一倍
        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        //初始化RecbBuf擴縮容索引表
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }
}

AdaptiveRecvByteBufAllocator 主要的作用就是為接收數據的ByteBuffer進行擴容縮容,那麼每次怎麼擴容?擴容多少?怎麼縮容?縮容多少呢??

這四個問題將是本小節筆者要為大家解答的內容~~~

Netty中定義了一個int型的數組SIZE_TABLE 來存儲每個擴容單位對應的容量大小。建立起擴縮容的容量索引表。每次擴容多少,縮容多少全部記錄在這個容量索引表中。

在AdaptiveRecvByteBufAllocatorl類初始化的時候會在static{}靜態代碼塊中對擴縮容索引表SIZE_TABLE 進行初始化。

從源碼中我們可以看出SIZE_TABLE 的初始化分為兩個部分:

  • 當索引容量小於512時,SIZE_TABLE 中定義的容量索引是從16開始16遞增。

image.png

  • 當索引容量大於512時,SIZE_TABLE 中定義的容量索引是按前一個索引容量的2倍遞增。

image.png

4.3 擴縮容邏輯

現在擴縮容索引表SIZE_TABLE 已經初始化完畢了,那麼當我們需要對ByteBuffer進行擴容或者縮容的時候如何根據SIZE_TABLE 決定擴容多少或者縮容多少呢??

這就用到了在AdaptiveRecvByteBufAllocator類中定義的擴容步長INDEX_INCREMENT = 4,縮容步長INDEX_DECREMENT = 1了。

我們就以上面兩副擴縮容容量索引表SIZE_TABLE 中的容量索引展示截圖為例,來介紹下擴縮容邏輯,假設我們當前ByteBuffer的容量索引為33,對應的容量為2048

4.3.1 擴容

當對容量為2048的ByteBuffer進行擴容時,根據當前的容量索引index = 33 加上 擴容步長INDEX_INCREMENT = 4計算出擴容後的容量索引為37,那麼擴縮容索引表SIZE_TABLE下標37對應的容量就是本次ByteBuffer擴容後的容量SIZE_TABLE[37] = 32768

4.3.1 縮容

同理對容量為2048的ByteBuffer進行縮容時,我們就需要用當前容量索引index = 33 減去 縮容步長INDEX_DECREMENT = 1計算出縮容後的容量索引32,那麼擴縮容索引表SIZE_TABLE下標32對應的容量就是本次ByteBuffer縮容後的容量SIZE_TABLE[32] = 1024

4.4 擴縮容時機

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
        @Override
        public final void read() {
            .........省略......
            try {
                do {
                      .........省略......
                } while (allocHandle.continueReading());

                //根據本次read loop總共讀取的位元組數,決定下次是否擴容或者縮容
                allocHandle.readComplete();

                .........省略.........

            } catch (Throwable t) {
                ...............省略...............
            } finally {
               ...............省略...............
            }
        }
}

在每輪read loop結束之後,我們都會調用allocHandle.readComplete()來根據在allocHandle中統計的在本輪read loop中讀取位元組總大小,來決定在下一輪read loop中是否對DirectByteBuffer進行擴容或者縮容。

public abstract class MaxMessageHandle implements ExtendedHandle {

       @Override
       public void readComplete() {
                //是否對recvbuf進行擴容縮容
                record(totalBytesRead());
       }

       private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }        
}

我們以當前ByteBuffer容量為2048,容量索引index = 33為例,對allocHandle的擴容縮容規則進行說明。

擴容步長INDEX_INCREMENT = 4,縮容步長INDEX_DECREMENT = 1

image.png

4.4.1 縮容

  • 如果本次OP_READ事件實際讀取到的總位元組數actualReadBytes在SIZE_TABLE[index - INDEX_DECREMENT]與SIZE_TABLE[index]之間的話,也就是如果本輪read loop結束之後總共讀取的位元組數在[1024,2048]之間。說明此時分配的ByteBuffer容量正好,不需要進行縮容也不需要進行擴容。
    比如本次actualReadBytes = 2000,正好處在10242048之間。說明2048的容量正好。

  • 如果actualReadBytes 小於等於 SIZE_TABLE[index - INDEX_DECREMENT],也就是如果本輪read loop結束之後總共讀取的位元組數小於等於1024。表示本次讀取到的位元組數比當前ByteBuffer容量的下一級容量還要小,說明當前ByteBuffer的容量分配的有些大了,設置縮容標識decreaseNow = true。當下次OP_READ事件繼續滿足縮容條件的時候,開始真正的進行縮容。縮容後的容量為SIZE_TABLE[index - INDEX_DECREMENT],但不能小於SIZE_TABLE[minIndex]。

註意需要滿足兩次縮容條件才會進行縮容,且縮容步長為1,縮容比較謹慎

4.4.2 擴容

如果本次OP_READ事件處理總共讀取的位元組數actualReadBytes 大於等於 當前ByteBuffer容量(nextReceiveBufferSize)時,說明ByteBuffer分配的容量有點小了,需要進行擴容。擴容後的容量為SIZE_TABLE[index + INDEX_INCREMENT],但不能超過SIZE_TABLE[maxIndex]。

滿足一次擴容條件就進行擴容,並且擴容步長為4, 擴容比較奔放

4.5 AdaptiveRecvByteBufAllocator類的實例化

AdaptiveRecvByteBufAllocator類的實例化主要是確定ByteBuffer的初始容量,以及最小容量和最大容量在擴縮容索引表SIZE_TABLE中的下標:minIndex maxIndex

AdaptiveRecvByteBufAllocator定義了三個關於ByteBuffer容量的欄位:

  • DEFAULT_MINIMUM :表示ByteBuffer最小的容量,預設為64,也就是無論ByteBuffer在怎麼縮容,容量也不會低於64

  • DEFAULT_INITIAL :表示ByteBuffer的初始化容量。預設為2048

  • DEFAULT_MAXIMUM :表示ByteBuffer的最大容量,預設為65536,也就是無論ByteBuffer在怎麼擴容,容量也不會超過65536

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 2048;
    static final int DEFAULT_MAXIMUM = 65536;

    public AdaptiveRecvByteBufAllocator() {
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
       
         .................省略異常檢查邏輯.............

        //計算minIndex maxIndex
        //在SIZE_TABLE中二分查找最小 >= minimum的容量索引 :3
        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        //在SIZE_TABLE中二分查找最大 <= maximum的容量索引 :38
        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }
}

接下來的事情就是確定最小容量DEFAULT_MINIMUM 在SIZE_TABLE中的下標minIndex,以及最大容量DEFAULT_MAXIMUM 在SIZE_TABLE中的下標maxIndex

從AdaptiveRecvByteBufAllocator類初始化的過程中,我們可以看出SIZE_TABLE中存儲的數據特征是一個有序的集合。

我們可以通過二分查找在SIZE_TABLE中找出第一個容量大於等於DEFAULT_MINIMUM的容量索引minIndex

同理通過二分查找在SIZE_TABLE中找出最後一個容量小於等於DEFAULT_MAXIMUM的容量索引maxIndex

根據上一小節關於SIZE_TABLE中容量數據分佈的截圖,我們可以看出minIndex = 3maxIndex = 38

4.5.1 二分查找容量索引下標

    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;//無符號右移,高位始終補0
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

經常刷LeetCode的小伙伴肯定一眼就看出這個是二分查找的模板了。

它的目的就是根據給定容量,在擴縮容索引表SIZE_TABLE中,通過二分查找找到最貼近給定size的容量的索引下標(第一個大於等於 size的容量)

4.6 RecvByteBufAllocator.Handle

前邊我們提到最終動態調整ByteBuffer容量的是由AdaptiveRecvByteBufAllocator中的Handler負責的,我們來看下這個allocHandle的創建過程。

protected abstract class AbstractUnsafe implements Unsafe {

        private RecvByteBufAllocator.Handle recvHandle;

        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            if (recvHandle == null) {
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }

}

從allocHandle的獲取過程我們看到最allocHandle的創建是由AdaptiveRecvByteBufAllocator#newHandle方法執行的。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    private final class HandleImpl extends MaxMessageHandle {
        //最小容量在擴縮容索引表中的index
        private final int minIndex;
        //最大容量在擴縮容索引表中的index
        private final int maxIndex;
        //當前容量在擴縮容索引表中的index 初始33 對應容量2048
        private int index;
        //預計下一次分配buffer的容量,初始:2048
        private int nextReceiveBufferSize;
        //是否縮容
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            //在擴縮容索引表中二分查找到最小大於等於initial 的容量
            index = getSizeTableIndex(initial);
            //2048
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        .......................省略...................
    }

}

這裡我們看到Netty中用於動態調整ByteBuffer容量的allocHandle 的實際類型為MaxMessageHandle

下麵我們來介紹下HandleImpl 中的核心欄位,它們都和ByteBuffer的容量有關:

  • minIndex :最小容量在擴縮容索引表SIZE_TABE中的index。預設是3

  • maxIndex :最大容量在擴縮容索引表SIZE_TABE中的index。預設是38

  • index :當前容量在擴縮容索引表SIZE_TABE中的index。初始是33

  • nextReceiveBufferSize :預計下一次分配buffer的容量,初始為2048。在每次申請記憶體分配ByteBuffer的時候,採用nextReceiveBufferSize的值指定容量。

  • decreaseNow : 是否需要進行縮容。

5. 使用堆外記憶體為ByteBuffer分配記憶體

AdaptiveRecvByteBufAllocator類只是負責動態調整ByteBuffer的容量,而具體為ByteBuffer申請記憶體空間的是由PooledByteBufAllocator負責。

5.1 類名首碼Pooled的來歷

在我們使用Java進行日常開發過程中,在為對象分配記憶體空間的時候我們都會選擇在JVM堆中為對象分配記憶體,這樣做對我們Java開發者特別的友好,我們只管使用就好而不必過多關心這塊申請的記憶體如何回收,因為JVM堆完全受Java虛擬機控制管理,Java虛擬機會幫助我們回收不再使用的記憶體。

但是JVM在進行垃圾回收時候的stop the world會對我們應用程式的性能造成一定的影響。

除此之外我們在《聊聊Netty那些事兒之從內核角度看IO模型》一文中介紹IO模型的時候提到,當數據達到網卡時,網卡會通過DMA的方式將數據拷貝到內核空間中,這是第一次拷貝。當用戶線程在用戶空間發起系統IO調用時,CPU會將內核空間的數據再次拷貝到用戶空間。這是第二次拷貝

於此不同的是當我們在JVM中發起IO調用時,比如我們使用JVM堆記憶體讀取Socket接收緩衝區中的數據時,會多一次記憶體拷貝,CPU在第二次拷貝中將數據從內核空間拷貝到用戶空間時,此時的用戶空間站在JVM角度是堆外記憶體,所以還需要將堆外記憶體中的數據拷貝到堆內記憶體中。這就是第三次記憶體拷貝

同理當我們在JVM中發起IO調用向Socket發送緩衝區寫入數據時,JVM會將IO數據先拷貝堆外記憶體,然後才能發起系統IO調用。

那為什麼操作系統不直接使用JVM的堆內記憶體進行IO操作呢?

因為JVM的記憶體佈局和操作系統分配的記憶體是不一樣的,操作系統不可能按照JVM規範來讀寫數據,所以就需要第三次拷貝中間做個轉換將堆外記憶體中的數據拷貝到JVM堆中。


所以基於上述內容,在使用JVM堆內記憶體時會產生以下兩點性能影響:

  1. JVM在垃圾回收堆內記憶體時,會發生stop the world導致應用程式卡頓。

  2. 在進行IO操作的時候,會多產生一次由堆外記憶體到堆內記憶體的拷貝。

基於以上兩點使用JVM堆內記憶體對性能造成的影響,於是對性能有卓越追求的Netty採用堆外記憶體也就是DirectBuffer來為ByteBuffer分配記憶體空間。

採用堆外記憶體為ByteBuffer分配記憶體的好處就是:

  • 堆外記憶體直接受操作系統的管理,不會受JVM的管理,所以JVM垃圾回收對應用程式的性能影響就沒有了。

  • 網路數據到達之後直接在堆外記憶體上接收,進程讀取網路數據時直接在堆外記憶體中讀取,所以就避免了第三次記憶體拷貝

所以Netty在進行 I/O 操作時都是使用的堆外記憶體,可以避免數據從 JVM 堆記憶體到堆外記憶體的拷貝。但是由於堆外記憶體不受JVM的管理,所以就需要額外關註對記憶體的使用和釋放,稍有不慎就會造成記憶體泄露,於是Netty就引入了記憶體池堆外記憶體進行統一管理。

PooledByteBufAllocator類的這個首碼Pooled就是記憶體池的意思,這個類會使用Netty的記憶體池為ByteBuffer分配堆外記憶體

5.2 PooledByteBufAllocator的創建

創建時機

在服務端NioServerSocketChannel的配置類NioServerSocketChannelConfig以及客戶端NioSocketChannel的配置類NioSocketChannelConfig實例化的時候會觸發PooledByteBufAllocator的創建。

public class DefaultChannelConfig implements ChannelConfig {
    //PooledByteBufAllocator
    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

    ..........省略......
}

創建出來的PooledByteBufAllocator實例保存在DefaultChannelConfig類中的ByteBufAllocator allocator欄位中。

創建過程

public interface ByteBufAllocator {

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
    
    ..................省略............
}
public final class ByteBufUtil {

    static final ByteBufAllocator DEFAULT_ALLOCATOR;

    static {
        String allocType = SystemPropertyUtil.get(
                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();

        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = alloc;
        
        ...................省略..................
    }
}

從ByteBufUtil類的初始化過程我們可以看出,在為ByteBuffer分配記憶體的時候是否使用記憶體池在Netty中是可以配置的。

  • 通過系統變數-D io.netty.allocator.type 可以配置是否使用記憶體池為ByteBuffer分配記憶體。預設情況下是需要使用記憶體池的。但是在安卓系統中預設是不使用記憶體池的。

  • 通過PooledByteBufAllocator.DEFAULT獲取記憶體池ByteBuffer分配器

   public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

由於本文的主線是介紹Sub Reactor處理OP_READ事件的完整過程,所以這裡只介紹主線相關的內容,這裡只是簡單介紹下在接收數據的時候為什麼會用PooledByteBufAllocator來為ByteBuffer分配記憶體。而記憶體池的架構設計比較複雜,所以筆者後面會單獨寫一篇關於Netty記憶體管理的文章。


總結

本文介紹了Sub Reactor線程在處理OP_READ事件的整個過程。並深入剖析了AdaptiveRecvByteBufAllocator類動態調整ByteBuffer容量的原理。

同時也介紹了Netty為什麼會使用堆外記憶體來為ByteBuffer分配記憶體,並由此引出了Netty的記憶體池分配器PooledByteBufAllocator 。

在介紹AdaptiveRecvByteBufAllocator類和PooledByteBufAllocator一起組合實現動態地為ByteBuffer分配容量的時候,筆者不禁想起了多年前看過的《Effective Java》中第16條 複合優先於繼承

Netty在這裡也遵循了這條軍規,首先兩個類設計的都是單一的功能。

  • AdaptiveRecvByteBufAllocator類只負責動態的調整ByteBuffer容量,並不管具體的記憶體分配。

  • PooledByteBufAllocator類負責具體的記憶體分配,用記憶體池的方式。

這樣設計的就比較靈活,具體記憶體分配的工作交給具體的ByteBufAllocator,可以使用記憶體池的分配方式PooledByteBufAllocator,也可以不使用記憶體池的分配方式UnpooledByteBufAllocator。具體的記憶體可以採用JVM堆內記憶體(HeapBuffer),也可以使用堆外記憶體(DirectBuffer)。

AdaptiveRecvByteBufAllocator只需要關註調整它們的容量工作就可以了,而並不需要關註它們具體的記憶體分配方式。

最後通過io.netty.channel.RecvByteBufAllocator.Handle#allocate方法靈活組合不同的記憶體分配方式。這也是裝飾模式的一種應用。

byteBuf = allocHandle.allocate(allocator);

好了,今天的內容就到這裡,我們下篇文章見~~~~

閱讀原文

歡迎關註公眾號:bin的技術小屋


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這18個網站是我在取經路上意外發現的,裡面包括 純CSS 實現的炫酷背景,還有專門製作背景圖的網站。 算是取經路上的大補之物~ 1. CSS3 Patterns Gallery 🎗️ 傳送門:『CSS3 Patterns Gallery』 如果你認識 Lea Verou 的話,大概率知道這個網站, ...
  • 最近,在 CodePen 上看到這樣一個非常有意思的效果: 這個效果的核心難點在於氣泡的一種特殊融合效果。 其源代碼在:CodePen Demo -- Goey footer,作者主要使用的是 SVG 濾鏡完成的該效果,感興趣的可以戳源碼看看。 其中,要想靈活運用 SVG 中的 feGaussian ...
  • 1、樣式 1.1 行內樣式 <h1 style="color:red;">行內樣式</h1> 1.2 內部樣式 CSS代碼寫在 <head> 的 <style> 標簽中 <style> h1{color: green; } </style> 1.3 外部樣式 <link rel="styleshee ...
  • 本文簡介 你負責點贊,我負責更新~ 這次要用純CSS做一個波點背景,先上圖看看效果。 我把這個效果寫在 body 上,如果你不喜歡這個配色也可以自己手動改改。 思路 我實現上圖的效果思路是,最先想到使用 background-image ,然後使用 radial-gradient 畫圓。再配合預設給 ...
  • 講述在單片機軟體開發過程中如何更好地實現各個模塊的數據交互,降低耦合 ...
  • 恢復內容開始 選擇結構 選擇結構分為為i語句和switch語句 if語句 1.if單選擇語句 結構圖:(用輸出語句打了一個簡單結構圖) 代碼示例: public class IfDemo01 { public static void main(String[] args) { /*if單選擇語句 i ...
  • 搭建免費的代理ip池 需要解決的問題: 使用什麼方式存儲ip 文件存儲 缺點: 打開文件修改文件操作較麻煩 mysql 缺點: 查詢速度較慢 mongodb 缺點: 查詢速度較慢. 沒有查重功能 redis --> 使用redis存儲最為合適 所以 -> 數據結構採用redis中的zset有序集合 ...
  • 順序結構 java的基本結構有順序結構、選擇結構、迴圈結構。我們先來學習順序結構 java的基本結構就是順序結構,也就是java按照順序一步一步執行,除非有特殊說明。 順序結構也是最簡單的演算法結構 語句與語句之間,框與框之間是按從上到下的順序進行的,它是由若幹個一次執行的處理步驟組成的,它是任何一個 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...