本文介紹了NioServerSocketChannel處理客戶端連接事件的整個過程。接收連接的整個處理框架。影響Netty接收連接吞吐的Bug產生的原因,以及修複的方案。創建並初始化客戶端NioSocketChannel。初始化NioSocketChannel中的pipeline。客戶端NioSoc... ...
本系列Netty源碼解析文章基於 4.1.56.Final版本
對於一個高性能網路通訊框架來說,最最重要也是最核心的工作就是如何高效的接收客戶端連接,這就好比我們開了一個飯店,那麼迎接客人就是飯店最重要的工作,我們要先把客人迎接進來,不能讓客人一看人多就走掉,只要客人進來了,哪怕菜做的慢一點也沒關係。
本文筆者就來為大家介紹下netty這塊最核心的內容,看看netty是如何高效的接收客戶端連接的。
下圖為筆者在一個月黑風高天空顯得那麼深邃遙遠的夜晚,閑來無事,於是捧起Netty關於如何接收連接這部分源碼細細品讀的時候,意外的發現了一個影響Netty接收連接吞吐的一個Bug。
於是筆者就在Github提了一個Issue#11708,闡述了下這個Bug產生的原因以及導致的結果並和Netty的作者一起討論了下修複措施。如上圖所示。
Issue#11708:https://github.com/netty/netty/issues/11708
這裡先不詳細解釋這個Issue,也不建議大家現在就打開這個Issue查看,筆者會在本文的介紹中隨著源碼深入的解讀慢慢的為大家一層一層地撥開迷霧。
之所以在文章的開頭把這個拎出來,筆者是想讓大家帶著懷疑,審視,欣賞,崇敬,敬畏的態度來一起品讀世界頂級程式員編寫的代碼。由衷的感謝他們在這一領域做出的貢獻。
好了,問題拋出來後,我們就帶著這個疑問來開始本文的內容吧~~~
前文回顧
按照老規矩,再開始本文的內容之前,我們先來回顧下前邊幾篇文章的概要內容幫助大家梳理一個框架全貌出來。
筆者這裡再次想和讀者朋友們強調的是本文可以獨立觀看,並不依賴前邊系列文章的內容,只是大家如果對相關細節部分感興趣的話,可以在閱讀完本文之後在去回看相關文章。
在前邊的系列文章中,筆者為大家介紹了驅動Netty整個框架運轉的核心引擎Reactor的創建,啟動,運行的全流程。從現在開始Netty的整個核心框架就開始運轉起來開始工作了,本文要介紹的主要內容就是Netty在啟動之後要做的第一件事件:監聽埠地址,高效接收客戶端連接。
在《聊聊Netty那些事兒之從內核角度看IO模型》一文中,我們是從整個網路框架的基石IO模型的角度整體闡述了下Netty的IO線程模型。
而Netty中的Reactor正是IO線程在Netty中的模型定義。Reactor在Netty中是以Group的形式出現的,分為:
-
主Reactor線程組也就是我們在啟動代碼中配置的
EventLoopGroup bossGroup
,main reactor group中的reactor主要負責監聽客戶端連接事件,高效的處理客戶端連接。也是本文我們要介紹的重點。 -
從Reactor線程組也就是我們在啟動代碼中配置的
EventLoopGroup workerGroup
,sub reactor group中的reactor主要負責處理客戶端連接上的IO事件,以及非同步任務的執行。
最後我們得出Netty的整個IO模型如下:
本文我們討論的重點就是MainReactorGroup的核心工作上圖中所示的步驟1,步驟2,步驟3。
在從整體上介紹完Netty的IO模型之後,我們又在《Reactor在Netty中的實現(創建篇)》中完整的介紹了Netty框架的骨架主從Reactor組的搭建過程,闡述了Reactor是如何被創建出來的,並介紹了它的核心組件如下圖所示:
-
thread
即為Reactor中的IO線程,主要負責監聽IO事件,處理IO任務,執行非同步任務。 -
selector
則是JDK NIO對操作系統底層IO多路復用技術實現的封裝。用於監聽IO就緒事件。 -
taskQueue
用於保存Reactor需要執行的非同步任務,這些非同步任務可以由用戶在業務線程中向Reactor提交,也可以是Netty框架提交的一些自身核心的任務。 -
scheduledTaskQueue
則是保存Reactor中執行的定時任務。代替了原有的時間輪來執行延時任務。 -
tailQueue
保存了在Reactor需要執行的一些尾部收尾任務,在普通任務執行完後 Reactor線程會執行尾部任務,比如對Netty 的運行狀態做一些統計數據,例如任務迴圈的耗時、占用物理記憶體的大小等等
在骨架搭建完畢之後,我們隨後又在在《詳細圖解Netty Reactor啟動全流程》》一文中介紹了本文的主角服務端NioServerSocketChannel的創建,初始化,綁定埠地址,向main reactor註冊監聽OP_ACCEPT事件
的完整過程。
main reactor如何處理OP_ACCEPT事件將會是本文的主要內容。
自此Netty框架的main reactor group已經啟動完畢,開始準備監聽OP_accept事件,當客戶端連接上來之後,OP_ACCEPT事件活躍,main reactor開始處理OP_ACCEPT事件接收客戶端連接了。
而netty中的IO事件分為:OP_ACCEPT事件,OP_READ事件,OP_WRITE事件和OP_CONNECT事件,netty對於IO事件的監聽和處理統一封裝在Reactor模型中,這四個IO事件的處理過程也是我們後續文章中要單獨拿出來介紹的,本文我們聚焦OP_ACCEPT事件的處理。
而為了讓大家能夠對IO事件的處理有一個完整性的認識,筆者寫了《一文聊透Netty核心引擎Reactor的運轉架構》這篇文章,在文章中詳細介紹了Reactor線程的整體運行框架。
Reactor線程會在一個死迴圈中996不停的運轉,在迴圈中會不斷的輪詢監聽Selector上的IO事件,當IO事件活躍後,Reactor從Selector上被喚醒轉去執行IO就緒事件的處理,在這個過程中我們引出了上述四種IO事件的處理入口函數。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//獲取Channel的底層操作類Unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
......如果SelectionKey已經失效則關閉對應的Channel......
}
try {
//獲取IO就緒事件
int readyOps = k.readyOps();
//處理Connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
//移除對Connect事件的監聽,否則Selector會一直通知
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//觸發channelActive事件處理Connect事件
unsafe.finishConnect();
}
//處理Write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//處理Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
本文筆者將會為大家重點介紹OP_ACCEPT事件
的處理入口函數unsafe.read()
的整個源碼實現。
當客戶端連接完成三次握手之後,main reactor中的selector產生OP_ACCEPT事件
活躍,main reactor隨即被喚醒,來到了OP_ACCEPT事件
的處理入口函數開始接收客戶端連接。
1. Main Reactor處理OP_ACCEPT事件
當Main Reactor
輪詢到NioServerSocketChannel
上的OP_ACCEPT事件
就緒時,Main Reactor線程就會從JDK Selector
上的阻塞輪詢APIselector.select(timeoutMillis)
調用中返回。轉而去處理NioServerSocketChannel
上的OP_ACCEPT事件
。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
..............省略.................
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
..............處理OP_CONNECT事件.................
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
..............處理OP_WRITE事件.................
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//本文重點處理OP_ACCEPT事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
-
處理IO就緒事件的入口函數
processSelectedKey
中的參數AbstractNioChannel ch
正是Netty服務端NioServerSocketChannel
。因為此時的執行線程為main reactor線程,而main reactor上註冊的正是netty服務端NioServerSocketChannel負責監聽埠地址,接收客戶端連接。 -
通過
ch.unsafe()
獲取到的NioUnsafe操作類正是NioServerSocketChannel中對底層JDK NIO ServerSocketChannel的Unsafe底層操作類。
Unsafe介面
是Netty對Channel底層操作行為的封裝,比如NioServerSocketChannel的底層Unsafe操作類乾的事情就是綁定埠地址
,處理OP_ACCEPT事件
。
這裡我們看到,Netty將OP_ACCEPT事件
處理的入口函數封裝在NioServerSocketChannel
里的底層操作類Unsafe的read
方法中。
而NioServerSocketChannel中的Unsafe操作類實現類型為NioMessageUnsafe
定義在上圖繼承結構中的AbstractNioMessageChannel父類中
。
下麵我們到NioMessageUnsafe#read
方法中來看下Netty對OP_ACCPET事件
的具體處理過程:
2. 接收客戶端連接核心流程框架總覽
我們還是按照老規矩,先從整體上把整個OP_ACCEPT事件的邏輯處理框架提取出來,讓大家先總體俯視下流程全貌,然後在針對每個核心點位進行各個擊破。
main reactor線程是在一個do...while{...}
迴圈read loop中不斷的調用JDK NIO serverSocketChannel.accept()
方法來接收完成三次握手的客戶端連接NioSocketChannel
的,並將接收到的客戶端連接NioSocketChannel臨時保存在List<Object> readBuf
集合中,後續會服務端NioServerSocketChannel的pipeline中通過ChannelRead事件來傳遞,最終會在ServerBootstrapAcceptor這個ChannelHandler中被處理初始化,並將其註冊到Sub Reator Group中。
這裡的read loop迴圈會被限定只能讀取16次,當main reactor從NioServerSocketChannel中讀取客戶端連接NioSocketChannel的次數達到16次之後,無論此時是否還有客戶端連接都不能在繼續讀取了。
因為我們在《一文聊透Netty核心引擎Reactor的運轉架構》一文中提到,netty對reactor線程壓榨的比較狠,要乾的事情很多,除了要監聽輪詢IO就緒事件,處理IO就緒事件,還需要執行用戶和netty框架本省提交的非同步任務和定時任務。
所以這裡的main reactor線程不能在read loop中無限制的執行下去,因為還需要分配時間去執行非同步任務,不能因為無限制的接收客戶端連接而耽誤了非同步任務的執行。所以這裡將read loop的迴圈次數限定為16次。
如果main reactor線程在read loop中讀取客戶端連接NioSocketChannel的次數已經滿了16次,即使此時還有客戶端連接未接收,那麼main reactor線程也不會再去接收了,而是轉去執行非同步任務,當非同步任務執行完畢後,還會在回來執行剩餘接收連接的任務。
main reactor線程退出read loop迴圈的條件有兩個:
-
在限定的16次讀取中,已經沒有新的客戶端連接要接收了。退出迴圈。
-
從NioServerSocketChannel中讀取客戶端連接的次數達到了16次,無論此時是否還有客戶端連接都需要退出迴圈。
以上就是Netty在接收客戶端連接時的整體核心邏輯,下麵筆者將這部分邏輯的核心源碼實現框架提取出來,方便大家根據上述核心邏輯與源碼中的處理模塊對應起來,還是那句話,這裡只需要總體把握核心處理流程,不需要讀懂每一行代碼,筆者會在文章的後邊分模塊來各個擊破它們。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放連接建立後,創建的客戶端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//必須在Main Reactor線程中執行
assert eventLoop().inEventLoop();
//註意下麵的config和pipeline都是服務端ServerSocketChannel中的
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//創建接收數據Buffer分配器(用於分配容量大小合適的byteBuffer用來容納接收數據)
//在接收連接的場景中,這裡的allocHandle只是用於控制read loop的迴圈讀取創建連接的次數。
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
int localRead = doReadMessages(readBuf);
//已無新的連接可接收則退出read loop
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統計在當前事件迴圈中已經讀取到得Message數量(創建連接的個數)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());//判斷是否已經讀滿16次
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//在NioServerSocketChannel對應的pipeline中傳播ChannelRead事件
//初始化客戶端SocketChannel,並將其綁定到Sub Reactor線程組中的一個Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
//清除本次accept 創建的客戶端SocketChannel集合
readBuf.clear();
allocHandle.readComplete();
//觸發readComplete事件傳播
pipeline.fireChannelReadComplete();
....................省略............
} finally {
....................省略............
}
}
}
}
}
這裡首先要通過斷言 assert eventLoop().inEventLoop()
確保處理接收客戶端連接的線程必須為Main Reactor 線程。
而main reactor中主要註冊的是服務端NioServerSocketChannel,主要負責處理OP_ACCEPT事件
,所以當前main reactor線程是在NioServerSocketChannel中執行接收連接的工作。
所以這裡我們通過config()
獲取到的是NioServerSocketChannel的屬性配置類NioServerSocketChannelConfig
,它是在Reactor的啟動階段被創建出來的。
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設置用於Channel接收數據用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
同理這裡通過pipeline()
獲取到的也是NioServerSocketChannel中的pipeline
。它會在NioServerSocketChannel向main reactor註冊成功之後被初始化。
前邊提到main reactor線程會被限定只能在read loop中向NioServerSocketChannel讀取16次客戶端連接,所以在開始read loop之前,我們需要創建一個能夠保存記錄讀取次數的對象,在每次read loop迴圈之後,可以根據這個對象來判斷是否結束read loop。
這個對象就是這裡的 RecvByteBufAllocator.Handle allocHandle
專門用於統計read loop中接收客戶端連接的次數,以及判斷是否該結束read loop轉去執行非同步任務。
當這一切準備就緒之後,main reactor線程就開始在do{....}while(...)
迴圈中接收客戶端連接了。
在 read loop中通過調用doReadMessages函數
接收完成三次握手的客戶端連接,底層會調用到JDK NIO ServerSocketChannel的accept方法,從內核全連接隊列中取出客戶端連接。
返回值localRead
表示接收到了多少客戶端連接,客戶端連接通過accept方法只會一個一個的接收,所以這裡的localRead
正常情況下都會返回1
,當localRead <= 0
時意味著已經沒有新的客戶端連接可以接收了,本次main reactor接收客戶端的任務到這裡就結束了,跳出read loop。開始新的一輪IO事件的監聽處理。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
隨後會將接收到的客戶端連接占時存放到List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放連接建立後,創建的客戶端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
}
調用allocHandle.incMessagesRead
統計本次事件迴圈中接收到的客戶端連接個數,最後在read loop末尾通過allocHandle.continueReading
判斷是否達到了限定的16次。從而決定main reactor線程是繼續接收客戶端連接還是轉去執行非同步任務。
main reactor線程退出read loop的兩個條件:
-
在限定的16次讀取中,已經沒有新的客戶端連接要接收了。退出迴圈。
-
從NioServerSocketChannel中讀取客戶端連接的次數達到了16次,無論此時是否還有客戶端連接都需要退出迴圈。
當滿足以上兩個退出條件時,main reactor線程就會退出read loop,由於在read loop中接收到的客戶端連接全部暫存在List<Object> readBuf
集合中,隨後開始遍歷readBuf,在NioServerSocketChannel的pipeline中傳播ChannelRead事件。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//NioServerSocketChannel對應的pipeline中傳播read事件
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
//初始化客戶端SocketChannel,並將其綁定到Sub Reactor線程組中的一個Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
最終pipeline中的ChannelHandler(ServerBootstrapAcceptor)會響應ChannelRead事件,併在相應回調函數中初始化客戶端NioSocketChannel,並將其註冊到Sub Reactor Group中。此後客戶端NioSocketChannel綁定到的sub reactor就開始監聽處理客戶端連接上的讀寫事件了。
Netty整個接收客戶端的邏輯過程如下圖步驟1,2,3所示。
以上內容就是筆者提取出來的整體流程框架,下麵我們來將其中涉及到的重要核心模塊拆開,一個一個詳細解讀下。
3. RecvByteBufAllocator簡介
Reactor在處理對應Channel上的IO數據時,都會採用一個ByteBuffer
來接收Channel上的IO數據。而本小節要介紹的RecvByteBufAllocator正是用來分配ByteBuffer的一個分配器。
還記得這個RecvByteBufAllocator
在哪裡被創建的嗎??
在《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中,在介紹NioServerSocketChannel
的創建過程中提到,對應Channel的配置類NioServerSocketChannelConfig也會隨著NioServerSocketChannel的創建而創建。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
在創建NioServerSocketChannelConfig
的過程中會創建RecvByteBufAllocator
。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
這裡我們看到NioServerSocketChannel中的RecvByteBufAllocator實際類型為AdaptiveRecvByteBufAllocator
,顧名思義,這個類型的RecvByteBufAllocator可以根據Channel上每次到來的IO數據大小來自適應動態調整ByteBuffer的容量。
對於服務端NioServerSocketChannel來說,它上邊的IO數據就是客戶端的連接,它的長度和類型都是固定的,所以在接收客戶端連接的時候並不需要這樣的一個ByteBuffer來接收,我們會將接收到的客戶端連接存放在List<Object> readBuf
集合中
對於客戶端NioSocketChannel來說,它上邊的IO數據時客戶端發送來的網路數據,長度是不定的,所以才會需要這樣一個可以根據每次IO數據的大小來自適應動態調整容量的ByteBuffer來接收。
那麼看起來這個RecvByteBufAllocator和本文的主題不是很關聯,因為在接收連接的過程中並不會怎麼用到它,這個類筆者還會在後面的文章中詳細介紹,之所以這裡把它拎出來單獨介紹是因為它和本文開頭提到的Bug有關係,這個Bug就是由這個類引起的。
3.1 RecvByteBufAllocator.Handle的獲取
在本文中,我們是通過NioServerSocketChannel中的unsafe底層操作類來獲取RecvByteBufAllocator.Handle的
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
}
我們看到最終會在NioServerSocketChannel的配置類NioServerSocketChannelConfig中獲取到AdaptiveRecvByteBufAllocator
public class DefaultChannelConfig implements ChannelConfig {
//用於Channel接收數據用的buffer分配器 類型為AdaptiveRecvByteBufAllocator
private volatile RecvByteBufAllocator rcvBufAllocator;
}
AdaptiveRecvByteBufAllocator
中會創建自適應動態調整容量的ByteBuffer分配器。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
private final class HandleImpl extends MaxMessageHandle {
.................省略................
}
}
這裡的newHandle
方法返回的具體類型為MaxMessageHandle
,這個MaxMessageHandle
裡邊保存了每次從Channel
中讀取IO數據
的容量指標,方便下次讀取時分配合適大小的buffer
。
每次在使用allocHandle
前需要調用allocHandle.reset(config);
重置裡邊的統計指標。
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
//每次事件輪詢時,最多讀取16次
private int maxMessagePerRead;
//本次事件輪詢總共讀取的message數,這裡指的是接收連接的數量
private int totalMessages;
//本次事件輪詢總共讀取的位元組數
private int totalBytesRead;
@Override
public void reset(ChannelConfig config) {
this.config = config;
//預設每次最多讀取16次
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
}
- maxMessagePerRead:用於控制每次read loop里最大可以迴圈讀取的次數,預設為16次,可在啟動配置類
ServerBootstrap
中通過ChannelOption.MAX_MESSAGES_PER_READ
選項設置。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.MAX_MESSAGES_PER_READ, 自定義次數)
- totalMessages:用於統計read loop中總共接收的連接個數,每次read loop迴圈後會調用
allocHandle.incMessagesRead
增加記錄接收到的連接個數。
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
- totalBytesRead:用於統計在read loop中總共接收到客戶端連接上的數據大小,這個欄位主要用於sub reactor在接收客戶端NioSocketChannel上的網路數據用的,本文我們介紹的是main reactor接收客戶端連接,所以這裡並不會用到這個欄位。這個欄位會在sub reactor每次讀取完NioSocketChannel上的網路數據時增加記錄。
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
MaxMessageHandler中還有一個非常重要的方法就是在每次read loop末尾會調用allocHandle.continueReading()
方法來判斷讀取連接次數是否已滿16次,來決定main reactor線程是否退出迴圈。
do {
//底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統計在當前事件迴圈中已經讀取到得Message數量(創建連接的個數)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
紅框中圈出來的兩個判斷條件和本文主題無關,我們這裡不需要關註,筆者會在後面的文章詳細介紹。
-
totalMessages < maxMessagePerRead
:在本文的接收客戶端連接場景中,這個條件用於判斷main reactor線程在read loop中的讀取次數是否超過了16次。如果超過16次就會返回false,main reactor線程退出迴圈。 -
totalBytesRead > 0
:用於判斷當客戶端NioSocketChannel上的OP_READ事件活躍時,sub reactor線程在read loop中是否讀取到了網路數據。
以上內容就是RecvByteBufAllocator.Handle在接收客戶端連接場景下的作用,大家這裡仔細看下這個allocHandle.continueReading()
方法退出迴圈的判斷條件,再結合整個do{....}while(...)
接收連接迴圈體,感受下是否哪裡有些不對勁?Bug即將出現~~~
4. 啊哈!!Bug ! !
netty不論是在本文中處理接收客戶端連接的場景還是在處理接收客戶端連接上的網路數據場景都會在一個do{....}while(...)
迴圈read loop中不斷的處理。
同時也都會利用在上一小節中介紹的RecvByteBufAllocator.Handle
來記錄每次read loop接收到的連接個數和從連接上讀取到的網路數據大小。
從而在read loop的末尾都會通過allocHandle.continueReading()
方法判斷是否應該退出read loop迴圈結束連接的接收流程或者是結束連接上數據的讀取流程。
無論是用於接收客戶端連接的main reactor也好還是用於接收客戶端連接上的網路數據的sub reactor也好,它們的運行框架都是一樣的,只不過是具體分工不同。
所以netty這裡想用統一的RecvByteBufAllocator.Handle
來處理以上兩種場景。
而RecvByteBufAllocator.Handle
中的totalBytesRead
欄位主要記錄sub reactor線程在處理客戶端NioSocketChannel中OP_READ事件活躍時,總共在read loop中讀取到的網路數據,而這裡是main reactor線程在接收客戶端連接所以這個欄位並不會被設置。totalBytesRead欄位的值在本文中永遠會是0
。
所以無論同時有多少個客戶端併發連接到服務端上,在接收連接的這個read loop中永遠只會接受一個連接就會退出迴圈,因為allocHandle.continueReading()方法
中的判斷條件totalBytesRead > 0
永遠會返回false
。
do {
//底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統計在當前事件迴圈中已經讀取到得Message數量(創建連接的個數)
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
而netty的本意是在這個read loop迴圈中儘可能多的去接收客戶端的併發連接,同時又不影響main reactor線程執行非同步任務。但是由於這個Bug,main reactor在這個迴圈中只執行一次就結束了。這也一定程度上就影響了netty的吞吐。
讓我們想象下這樣的一個場景,當有16個客戶端同時併發連接到了服務端,這時NioServerSocketChannel上的OP_ACCEPT事件
活躍,main reactor從Selector上被喚醒,隨後執行OP_ACCEPT事件
的處理。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
............省略.........
case SelectStrategy.BUSY_WAIT:
............省略.........
case SelectStrategy.SELECT:
............監聽輪詢IO事件.........
default:
}
} catch (IOException e) {
............省略.........
}
............處理IO就緒事件.........
............執行非同步任務.........
}
}
但是由於這個Bug的存在,main reactor在接收客戶端連接的這個read loop中只接收了一個客戶端連接就匆匆返回了。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
然後根據下圖中這個Reactor的運行結構去執行非同步任務,隨後繞一大圈又會回到NioEventLoop#run
方法中重新發起一輪OP_ACCEPT事件輪詢。
由於現在還有15個客戶端併發連接沒有被接收,所以此時Main Reactor線程並不會在selector.select()
上阻塞,最終繞一圈又會回到NioMessageUnsafe#read
方法的do{.....}while()
迴圈。在接收一個連接之後又退出迴圈。
本來我們可以在一次read loop中把這16個併發的客戶端連接全部接收完畢的,因為這個Bug,main reactor需要不斷的發起OP_ACCEPT事件的輪詢,繞了很大一個圈子。同時也增加了許多不必要的selector.select()系統調用開銷
這時大家在看這個Issue#11708中的討論是不是就清晰很多了~~
Issue#11708:https://github.com/netty/netty/issues/11708
4.1 Bug的修複
筆者在寫這篇文章的時候,Netty最新版本是4.1.68.final,這個Bug在4.1.69.final中被修複。
由於該Bug產生的原因正是因為服務端NioServerSocketChannel(用於監聽埠地址和接收客戶端連接)和 客戶端NioSocketChannel(用於通信)中的Config配置類混用了同一個ByteBuffer分配器AdaptiveRecvByteBufAllocator
而導致的。
所以在新版本修複中專門為服務端ServerSocketChannel中的Config配置類引入了一個新的ByteBuffer分配器ServerChannelRecvByteBufAllocator
,專門用於服務端ServerSocketChannel接收客戶端連接的場景。
在ServerChannelRecvByteBufAllocator
的父類DefaultMaxMessagesRecvByteBufAllocator
中引入了一個新的欄位ignoreBytesRead
,用於表示是否忽略網路位元組的讀取,在創建服務端Channel配置類NioServerSocketChannelConfig的時候,這個欄位會被賦值為true
。
當main reactor線程在read loop迴圈中接收客戶端連接的時候。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
在read loop迴圈的末尾就會採用從ServerChannelRecvByteBufAllocator
中創建的MaxMessageHandle#continueReading
方法來判斷讀取連接次數是否超過了16次。由於這裡的ignoreBytesRead == true
這回我們就會忽略totalBytesRead == 0
的情況,從而使得接收連接的read loop得以繼續地執行下去。在一個read loop中一次性把16個連接全部接收完畢。
以上就是對這個Bug產生的原因,以及發現的過程,最後修複的方案一個全面的介紹,因此筆者也出現在了netty 4.1.69.final版本發佈公告里的thank-list中。哈哈,真是令人開心的一件事情~~~
通過以上對netty接收客戶端連接的全流程分析和對這個Bug來龍去脈以及修複方案的介紹,大家現在一定已經理解了整個接收連接的流程框架。
接下來筆者就把這個流程中涉及到的一些核心模塊在單獨拎出來從細節入手,為大家各個擊破~~~
5. doReadMessages接收客戶端連接
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
- 通過
javaChannel()
獲取封裝在Netty服務端NioServerSocketChannel
中的JDK 原生 ServerSocketChannel
。
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
- 通過
JDK NIO 原生
的ServerSocketChannel
的accept方法
獲取JDK NIO 原生
客戶端連接SocketChannel
。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
這一步就是我們在《聊聊Netty那些事兒之從內核角度看IO模型》介紹到的調用監聽Socket
的accept方法
,內核會基於監聽Socket
創建出來一個新的Socket
專門用於與客戶端之間的網路通信這個我們稱之為客戶端連接Socket
。這裡的ServerSocketChannel
就類似於監聽Socket
。SocketChannel
就類似於客戶端連接Socket
。
由於我們在創建NioServerSocketChannel
的時候,會將JDK NIO 原生
的ServerSocketChannel
設置為非阻塞
,所以這裡當ServerSocketChannel
上有客戶端連接時就會直接創建SocketChannel
,如果此時並沒有客戶端連接時accept調用
就會立刻返回null
並不會阻塞。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設置Channel為非阻塞 配合IO多路復用模型
ch.configureBlocking(false);
} catch (IOException e) {
..........省略.............
}
}
5.1 創建客戶端NioSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
.........省略.......
}
return 0;
}
}
這裡會根據ServerSocketChannel
的accept
方法獲取到JDK NIO 原生
的SocketChannel
(用於底層真正與客戶端通信的Channel),來創建Netty中的NioSocketChannel
。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}
創建客戶端NioSocketChannel
的過程其實和之前講的創建服務端NioServerSocketChannel
大體流程是一樣的,我們這裡只對客戶端NioSocketChannel
和服務端NioServerSocketChannel
在創建過程中的不同之處做一個對比。
具體細節部分大家可以在回看下《詳細圖解Netty Reactor啟動全流程》一文中關於
NioServerSocketChannel
的創建的詳細細節。
5.3 對比NioSocketChannel與NioServerSocketChannel的不同
1:Channel的層次不同
在我們介紹Reactor的創建文章中,我們提到Netty中的Channel
是具有層次的。由於客戶端NioSocketChannel是在main reactor接收連接時在服務端NioServerSocketChannel中被創建的,所以在創建客戶端NioSocketChannel的時候會通過構造函數指定了parent屬性為NioServerSocketChanel
。並將JDK NIO 原生
的SocketChannel
封裝進Netty的客戶端NioSocketChannel
中。
而在Reactor啟動過程中創建NioServerSocketChannel
的時候parent屬性
指定是null
。因為它就是頂層的Channel
,負責創建客戶端NioSocketChannel
。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
2:向Reactor註冊的IO事件不同
客戶端NioSocketChannel向Sub Reactor註冊的是SelectionKey.OP_READ事件
,而服務端NioServerSocketChannel向Main Reactor註冊的是SelectionKey.OP_ACCEPT事件
。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監聽的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設置用於Channel接收數據用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
3: 功能屬性不同造成繼承結構的不同
客戶端NioSocketChannel
繼承的是AbstractNioByteChannel
,而服務端NioServerSocketChannel
繼承的是AbstractNioMessageChannel
。
它們繼承的這兩個抽象類一個首碼是Byte
,一個首碼是Message
有什麼區別嗎??
客戶端
NioSocketChannel
主要處理的是服務端與客戶端的通信,這裡涉及到接收客戶端發送來的數據,而Sub Reactor線程
從NioSocketChannel
中讀取的正是網路通信數據單位為Byte
。
服務端
NioServerSocketChannel
主要負責處理OP_ACCEPT事件
,創建用於通信的客戶端NioSocketChannel
。這時候客戶端與服務端還沒開始通信,所以Main Reactor線程
從NioServerSocketChannel
的讀取對象為Message
。這裡的Message
指的就是底層的SocketChannel
客戶端連接。
以上就是NioSocketChannel
與NioServerSocketChannel
創建過程中的不同之處,後面的過程就一樣了。
- 在AbstractNioChannel 類中封裝JDK NIO 原生的
SocketChannel
,並將其底層的IO模型設置為非阻塞
,保存需要監聽的IO事件OP_READ
。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設置Channel為非阻塞 配合IO多路復用模型
ch.configureBlocking(false);
} catch (IOException e) {
}
}
- 為客戶端NioSocketChannel創建全局唯一的
channelId
,創建客戶端NioSocketChannel的底層操作類NioByteUnsafe
,創建pipeline。
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用於底層socket的讀寫操作
unsafe = newUnsafe();
//為channel分配獨立的pipeline用於IO事件編排
pipeline = newChannelPipeline();
}
- 在NioSocketChannelConfig的創建過程中,將NioSocketChannel的RecvByteBufAllocator類型設置為
AdaptiveRecvByteBufAllocator
。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
在Bug修複後的版本中服務端NioServerSocketChannel的RecvByteBufAllocator類型設置為
ServerChannelRecvByteBufAllocator
最終我們得到的客戶端NioSocketChannel
結構如下:
6. ChannelRead事件的響應
在前邊介紹接收連接的整體核心流程框架的時候,我們提到main reactor線程是在一個do{.....}while(...)
迴圈read loop中不斷的調用ServerSocketChannel#accept
方法來接收客戶端的連接。
當滿足退出read loop迴圈的條件有兩個:
-
在限定的16次讀取中,已經沒有新的客戶端連接要接收了。退出迴圈。
-
從NioServerSocketChannel中讀取客戶端連接的次數達到了16次,無論此時是否還有客戶端連接都需要退出迴圈。
main reactor就會退出read loop迴圈,此時接收到的客戶端連接NioSocketChannel暫存與List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
try {
try {
do {
........省略.........
//底層調用NioServerSocketChannel->doReadMessages 創建客戶端SocketChannel
int localRead = doReadMessages(readBuf);
........省略.........
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
........省略.........
} finally {
........省略.........
}
}
}
隨後main reactor線程會遍歷List<Object> readBuf
集合中的NioSocketChannel,併在NioServerSocketChannel的pipeline中傳播ChannelRead事件。
最終ChannelRead事件
會傳播到ServerBootstrapAcceptor
中,這裡正是Netty處理客戶端連接的核心邏輯所在。
ServerBootstrapAcceptor
主要的作用就是初始化客戶端NioSocketChannel
,並將客戶端NioSocketChannel註冊到Sub Reactor Group
中,並監聽OP_READ事件
。
在ServerBootstrapAcceptor 中會初始化客戶端NioSocketChannel的這些屬性。
比如:從Reactor組EventLoopGroup childGroup
,用於初始化NioSocketChannel
中的pipeline
用到的ChannelHandler childHandler
,以及NioSocketChannel
中的一些childOptions
和childAttrs
。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//向客戶端NioSocketChannel的pipeline中
//添加在啟動配置類ServerBootstrap中配置的ChannelHandler
child.pipeline().addLast(childHandler);
//利用配置的屬性初始化客戶端NioSocketChannel
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
/**
* 1:在Sub Reactor線程組中選擇一個Reactor綁定
* 2:將客戶端SocketChannel註冊到綁定的Reactor上
* 3:SocketChannel註冊到sub reactor中的selector上,並監聽OP_READ事件
* */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
正是在這裡,netty會將我們在《詳細圖解Netty Reactor啟動全流程》的啟動示常式序中在ServerBootstrap中配置的客戶端NioSocketChannel的所有屬性(child首碼配置)初始化到NioSocketChannel中。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.option(ChannelOption.SO_BACKLOG, 100)//設置主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定埠啟動服務,開始監聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上示例代碼中通過ServerBootstrap配置的NioSocketChannel相關屬性,會在Netty啟動並開始初始化NioServerSocketChannel
的時候將ServerBootstrapAcceptor
的創建初始化工作封裝成非同步任務
,然後在NioServerSocketChannel
註冊到Main Reactor
中成功後執行。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
................省略................
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
................省略................
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
在經過ServerBootstrapAccptor#chanelRead回調
的處理之後,此時客戶端NioSocketChannel中pipeline的結構為:
隨後會將初始化好的客戶端NioSocketChannel向Sub Reactor Group中註冊,並監聽OP_READ事件
。
如下圖中的步驟3所示:
7. 向SubReactorGroup中註冊NioSocketChannel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
客戶端NioSocketChannel向Sub Reactor Group註冊的流程完全和服務端NioServerSocketChannel向Main Reactor Group註冊流程一樣。