本文花了大量的篇幅介紹了Reactor整體的運行框架,並深入介紹了Reactor核心的工作模塊的具體實現邏輯。通過本文的介紹我們知道了Reactor如何輪詢註冊在其上的所有Channel上感興趣的IO事件,以及Reactor如何去處理IO就緒的事件,如何執行Netty框架中提交的非同步任務和定時任務。... ...
本系列Netty源碼解析文章基於 4.1.56.Final版本
本文筆者來為大家介紹下Netty的核心引擎Reactor的運轉架構,希望通過本文的介紹能夠讓大家對Reactor是如何驅動著整個Netty框架的運轉有一個全面的認識。也為我們後續進一步介紹Netty關於處理網路請求的整個生命周期的相關內容做一個前置知識的鋪墊,方便大家後續理解。
那麼在開始本文正式的內容之前,筆者先來帶著大家回顧下前邊文章介紹的關於Netty整個框架如何搭建的相關內容,沒有看過筆者前邊幾篇文章的讀者朋友也沒關係,這些並不會影響到本文的閱讀,只不過涉及到相關細節的部分,大家可以在回看下。
前文回顧
在《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中,我們介紹了Netty服務端的核心引擎主從Reactor線程組
的創建過程以及相關核心組件里的重要屬性。在這個過程中,我們還提到了Netty對各種細節進行的優化,比如針對JDK NIO 原生Selector做的一些優化,展現了Netty對性能極致的追求。最終我們創建出瞭如下結構的Reactor。
在上篇文章《詳細圖解Netty Reactor啟動全流程》中,我們完整地介紹了Netty服務端啟動的整個流程,並介紹了在啟動過程中涉及到的ServerBootstrap相關的屬性以及配置方式。用於接收連接的服務端NioServerSocketChannel的創建和初始化過程以及其類的繼承結構。其中重點介紹了NioServerSocketChannel向Reactor的註冊過程以及Reactor線程的啟動時機和pipeline的初始化時機。最後介紹了NioServerSocketChannel綁定埠地址的整個流程。在這個過程中我們瞭解了Netty的這些核心組件是如何串聯起來的。
當Netty啟動完畢後,我們得到瞭如下的框架結構:
主Reactor線程組中管理的是NioServerSocketChannel
用於接收客戶端連接,併在自己的pipeline中的ServerBootstrapAcceptor
里初始化接收到的客戶端連接,隨後會將初始化好的客戶端連接註冊到從Reactor線程組中。
從Reactor線程組主要負責監聽處理註冊其上的所有客戶端連接的IO就緒事件。
其中一個Channel只能分配給一個固定的Reactor。一個Reactor負責處理多個Channel上的IO就緒事件,這樣可以將服務端承載的全量客戶端連接
分攤到多個Reactor
中處理,同時也能保證Channel上IO處理的線程安全性
。Reactor與Channel之間的對應關係如下圖所示:
以上內容就是對筆者前邊幾篇文章的相關內容回顧,大家能回憶起來更好,回憶不起來也沒關係,一點也不影響大家理解本文的內容。如果對相關細節感興趣的同學,可以在閱讀完本文之後,在去回看下。
我們言歸正傳,正式開始本文的內容,筆者接下來會為大家介紹這些核心組件是如何相互配合從而驅動著整個Netty Reactor框架運轉的。
當Netty Reactor框架啟動完畢後,接下來第一件事情也是最重要的事情就是如何來高效的接收客戶端的連接。
那麼在探討Netty服務端如何接收連接之前,我們需要弄清楚Reactor線程
的運行機制,它是如何監聽並處理Channel
上的IO就緒事件
的。
本文相當於是後續我們介紹Reactor線程
監聽處理ACCEPT事件
,Read事件
,Write事件
的前置篇,本文專註於講述Reactor線程
的整個運行框架。理解了本文的內容,對理解後面Reactor線程
如何處理IO事件
會大有幫助。
我們在Netty框架的創建階段
和啟動階段
無數次的提到了Reactor線程
,那麼在本文要介紹的運行階段
就該這個Reactor線程
來大顯神威了。
經過前邊文章的介紹,我們瞭解到Netty中的Reactor線程
主要乾三件事情:
-
輪詢註冊在
Reactor
上的所有Channel
感興趣的IO就緒事件
。 -
處理
Channel
上的IO就緒事件
。 -
執行Netty中的非同步任務。
正是這三個部分組成了Reactor
的運行框架,那麼我們現在來看下這個運行框架具體是怎麼運轉的~~
Reactor線程的整個運行框架
大家還記不記得筆者在《聊聊Netty那些事兒之從內核角度看IO模型》一文中提到的,IO模型的演變
是圍繞著"如何用儘可能少的線程去管理儘可能多的連接"
這一主題進行的。
Netty的IO模型
是通過JDK NIO Selector
實現的IO多路復用模型
,而Netty的IO線程模型
為主從Reactor線程模型
。
根據《聊聊Netty那些事兒之從內核角度看IO模型》一文中介紹的IO多路復用模型
我們很容易就能理解到Netty會使用一個用戶態的Reactor線程
去不斷的通過Selector
在內核態去輪訓Channel
上的IO就緒事件
。
說白了Reactor線程
其實執行的就是一個死迴圈
,在死迴圈
中不斷的通過Selector
去輪訓IO就緒事件
,如果發生IO就緒事件
則從Selector
系統調用中返回並處理IO就緒事件
,如果沒有發生IO就緒事件
則一直阻塞
在Selector
系統調用上,直到滿足Selector喚醒條件
。
以下三個條件中只要滿足任意一個條件,Reactor線程就會被從Selector上喚醒:
-
當Selector輪詢到有IO活躍事件發生時。
-
當Reactor線程需要執行的
定時任務
到達任務執行時間deadline
時。 -
當有
非同步任務
提交給Reactor時,Reactor線程需要從Selector
上被喚醒,這樣才能及時的去執行非同步任務
。
這裡可以看出Netty對
Reactor線程
的壓榨還是比較狠的,反正現在也沒有IO就緒事件
需要去處理,不能讓Reactor線程
在這裡白白等著,要立即喚醒它,轉去處理提交過來的非同步任務以及定時任務。Reactor線程
堪稱996典範
一刻不停歇地運作著。
在瞭解了Reactor線程
的大概運行框架後,我們接下來就到源碼中去看下它的核心運轉框架是如何實現出來的。
由於這塊源碼比較龐大繁雜,所以筆者先把它的運行框架提取出來,方便大家整體的理解整個運行過程的全貌。
上圖所展示的就是Reactor整個工作體系的全貌,主要分為如下幾個重要的工作模塊:
-
Reactor線程在Selector上阻塞獲取IO就緒事件。在這個模塊中首先會去檢查當前是否有非同步任務需要執行,如果有非同步需要執行,那麼不管當前有沒有IO就緒事件都不能阻塞在Selector上,隨後會去非阻塞的輪詢一下Selector上是否有IO就緒事件,如果有,正好可以和非同步任務一起執行。優先處理IO就緒事件,在執行非同步任務。
-
如果當前沒有非同步任務需要執行,那麼Reactor線程會接著查看是否有定時任務需要執行,如果有則在Selector上阻塞直到定時任務的到期時間deadline,或者滿足其他喚醒條件被喚醒。如果沒有定時任務需要執行,Reactor線程則會在Selector上一直阻塞直到滿足喚醒條件。
-
當Reactor線程滿足喚醒條件被喚醒後,首先會去判斷當前是因為有IO就緒事件被喚醒還是因為有非同步任務需要執行被喚醒或者是兩者都有。隨後Reactor線程就會去處理IO就緒事件和執行非同步任務。
-
最後Reactor線程返回迴圈起點不斷的重覆上述三個步驟。
以上就是Reactor線程運行的整個核心邏輯,下麵是筆者根據上述核心邏輯,將Reactor的整體代碼設計框架提取出來,大家可以結合上邊的Reactor工作流程圖,從總體上先感受下整個源碼實現框架,能夠把Reactor的核心處理步驟和代碼中相應的處理模塊對應起來即可,這裡不需要讀懂每一行代碼,要以邏輯處理模塊為單位理解。後面筆者會將這些一個一個的邏輯處理模塊在單獨拎出來為大家詳細介紹。
@Override
protected void run() {
//記錄輪詢次數 用於解決JDK epoll的空輪訓bug
int selectCnt = 0;
for (;;) {
try {
//輪詢結果
int strategy;
try {
//根據輪詢策略獲取輪詢結果 這裡的hasTasks()主要檢查的是普通隊列和尾部隊列中是否有非同步任務等待執行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO不支持自旋(BUSY_WAIT)
case SelectStrategy.SELECT:
核心邏輯是有任務需要執行,則Reactor線程立馬執行非同步任務,如果沒有非同步任務執行,則進行輪詢IO事件
default:
}
} catch (IOException e) {
................省略...............
}
執行到這裡說明滿足了喚醒條件,Reactor線程從selector上被喚醒開始處理IO就緒事件和執行非同步任務
/**
* Reactor線程需要保證及時的執行非同步任務,只要有非同步任務提交,就需要退出輪詢。
* 有IO事件就優先處理IO事件,然後處理非同步任務
* */
selectCnt++;
//主要用於從IO就緒的SelectedKeys集合中剔除已經失效的selectKey
needsToSelectAgain = false;
//調整Reactor線程執行IO事件和執行非同步任務的CPU時間比例 預設50,表示執行IO事件和非同步任務的時間比例是一比一
final int ioRatio = this.ioRatio;
這裡主要處理IO就緒事件,以及執行非同步任務
需要優先處理IO就緒事件,然後根據ioRatio設置的處理IO事件CPU用時與非同步任務CPU用時比例,
來決定執行多長時間的非同步任務
//判斷是否觸發JDK Epoll BUG 觸發空輪詢
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
//既沒有IO就緒事件,也沒有非同步任務,Reactor線程從Selector上被異常喚醒 觸發JDK Epoll空輪訓BUG
//重新構建Selector,selectCnt歸零
selectCnt = 0;
}
} catch (CancelledKeyException e) {
................省略...............
} catch (Error e) {
................省略...............
} catch (Throwable t) {
................省略...............
} finally {
................省略...............
}
}
}
從上面提取出來的Reactor的源碼實現框架中,我們可以看出Reactor線程
主要做了下麵幾個事情:
- 通過
JDK NIO Selector
輪詢註冊在Reactor
上的所有Channel
感興趣的IO事件
。對於NioServerSocketChannel來說因為它主要負責接收客戶端連接所以監聽的是OP_ACCEPT事件
,對於客戶端NioSocketChannel來說因為它主要負責處理連接上的讀寫事件所以監聽的是OP_READ
和OP_WRITE
事件。
這裡需要註意的是netty只會自動註冊
OP_READ
事件,而OP_WRITE事件
是在當Socket寫入緩衝區以滿無法繼續寫入發送數據時由用戶自己註冊。
-
如果有非同步任務需要執行,則立馬停止輪詢操作,轉去執行非同步任務。這裡分為兩種情況:
-
既有
IO就緒事件
發生,也有非同步任務
需要執行。則優先處理IO就緒事件
,然後根據ioRatio
設置的執行時間比例
決定執行多長時間的非同步任務。這裡Reactor線程需要控制非同步任務的執行時間,因為Reactor線程的核心是處理IO就緒事件,不能因為非同步任務的執行而耽誤了最重要的事情。 -
沒有
IO就緒事件
發生,但是有非同步任務或者定時任務到期需要執行。則只執行非同步任務
,儘可能的去壓榨Reactor線程。沒有IO就緒事件發生也不能閑著。
這裡第二種情況下只會執行
64
個非同步任務,目的是為了防止過度
執行非同步任務,耽誤了
最重要的事情輪詢IO事件
。 -
-
在最後Netty會判斷本次
Reactor線程
的喚醒是否是由於觸發了JDK epoll 空輪詢 BUG導致的,如果觸發了該BUG,則重建Selector
。繞過JDK BUG,達到解決問題的目的。
正常情況下Reactor線程從Selector中被喚醒有兩種情況:
- 輪詢到有IO就緒事件發生。
- 有非同步任務或者定時任務需要執行。
而JDK epoll 空輪詢 BUG會在上述兩種情況都沒有發生的時候,Reactor線程
會意外的從Selector
中被喚醒,導致CPU空轉。
JDK epoll 空輪詢 BUG:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6670302
好了,Reactor線程
的總體運行結構框架我們現在已經瞭解了,下麵我們來深入到這些核心處理模塊中來各個擊破它們~~
1. Reactor線程輪詢IO就緒事件
在《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中,筆者在講述主從Reactor線程組NioEventLoopGroup
的創建過程的時候,提到一個構造器參數SelectStrategyFactory
。
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Reactor線程
最重要的一件事情就是輪詢IO就緒事件
,SelectStrategyFactory
就是用於指定輪詢策略的,預設實現為DefaultSelectStrategyFactory.INSTANCE
。
而在Reactor線程
開啟輪詢的一開始,就是用這個selectStrategy
去計算一個輪詢策略strategy
,後續會根據這個strategy
進行不同的邏輯處理。
@Override
protected void run() {
//記錄輪詢次數 用於解決JDK epoll的空輪訓bug
int selectCnt = 0;
for (;;) {
try {
//輪詢結果
int strategy;
try {
//根據輪詢策略獲取輪詢結果 這裡的hasTasks()主要檢查的是普通隊列和尾部隊列中是否有非同步任務等待執行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO不支持自旋(BUSY_WAIT)
case SelectStrategy.SELECT:
核心邏輯是有任務需要執行,則Reactor線程立馬執行非同步任務,如果沒有非同步任務執行,則進行輪詢IO事件
default:
}
} catch (IOException e) {
................省略...............
}
................省略...............
}
下麵我們來看這個輪詢策略strategy
具體的計算邏輯是什麼樣的?
1.1 輪詢策略
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
我們首先來看下Netty中定義的這三種輪詢策略:
-
SelectStrategy.SELECT:
此時沒有任何非同步任務需要執行,Reactor線程
可以安心的阻塞
在Selector
上等待IO就緒事件
的來臨。 -
SelectStrategy.CONTINUE:
重新開啟一輪IO輪詢
。 -
SelectStrategy.BUSY_WAIT:
Reactor線程進行自旋輪詢
,由於NIO 不支持自旋操作
,所以這裡直接跳到SelectStrategy.SELECT
策略。
下麵我們來看下輪詢策略
的計算邏輯calculateStrategy
:
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
/**
* Reactor線程要保證及時的執行非同步任務
* 1:如果有非同步任務等待執行,則馬上執行selectNow()非阻塞輪詢一次IO就緒事件
* 2:沒有非同步任務,則跳到switch select分支
* */
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
- 在
Reactor線程
的輪詢工作開始之前,需要首先判斷下當前是否有非同步任務
需要執行。判斷依據就是查看Reactor
中的非同步任務隊列taskQueue
和用於統計信息任務用的尾部隊列tailTask
是否有非同步任務
。
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
- 如果
Reactor
中有非同步任務
需要執行,那麼Reactor線程
需要立即執行,不能阻塞在Selector
上。在返回前需要再順帶調用selectNow()
非阻塞查看一下當前是否有IO就緒事件
發生。如果有,那麼正好可以和非同步任務
一起被處理,如果沒有,則及時地處理非同步任務
。
這裡Netty要表達的語義是:首先Reactor線程需要優先保證
IO就緒事件
的處理,然後在保證非同步任務
的及時執行。如果當前沒有IO就緒事件但是有非同步任務需要執行時,Reactor線程就要去及時執行非同步任務而不是繼續阻塞在Selector上等待IO就緒事件。
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
//非阻塞
return selector.selectNow();
}
- 如果當前
Reactor線程
沒有非同步任務需要執行,那麼calculateStrategy
方法直接返回SelectStrategy.SELECT
也就是SelectStrategy介面
中定義的常量-1
。當calculateStrategy
方法通過selectNow()
返回非零
數值時,表示此時有IO就緒
的Channel
,返回的數值表示有多少個IO就緒
的Channel
。
@Override
protected void run() {
//記錄輪詢次數 用於解決JDK epoll的空輪訓bug
int selectCnt = 0;
for (;;) {
try {
//輪詢結果
int strategy;
try {
//根據輪詢策略獲取輪詢結果 這裡的hasTasks()主要檢查的是普通隊列和尾部隊列中是否有非同步任務等待執行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO不支持自旋(BUSY_WAIT)
case SelectStrategy.SELECT:
核心邏輯是有任務需要執行,則Reactor線程立馬執行非同步任務,如果沒有非同步任務執行,則進行輪詢IO事件
default:
}
} catch (IOException e) {
................省略...............
}
................處理IO就緒事件以及執行非同步任務...............
}
從預設的輪詢策略我們可以看出selectStrategy.calculateStrategy
只會返回三種情況:
-
返回 -1:
switch邏輯分支進入SelectStrategy.SELECT分支
,表示此時Reactor
中沒有非同步任務
需要執行,Reactor線程
可以安心的阻塞在Selector
上等待IO就緒事件
發生。 -
返回 0:
switch邏輯分支進入default分支
,表示此時Reactor
中沒有IO就緒事件
但是有非同步任務
需要執行,流程通過default分支
直接進入了處理非同步任務
的邏輯部分。 -
返回 > 0:
switch邏輯分支進入default分支
,表示此時Reactor
中既有IO就緒事件
發生也有非同步任務
需要執行,流程通過default分支
直接進入了處理IO就緒事件
和執行非同步任務
邏輯部分。
現在Reactor
的流程處理邏輯走向我們清楚了,那麼接下來我們把重點放在SelectStrategy.SELECT分支中的輪詢邏輯上。這塊是Reactor監聽IO就緒事件的核心。
1.2 輪詢邏輯
case SelectStrategy.SELECT:
//當前沒有非同步任務執行,Reactor線程可以放心的阻塞等待IO就緒事件
//從定時任務隊列中取出即將快要執行的定時任務deadline
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// -1代表當前定時任務隊列中沒有定時任務
curDeadlineNanos = NONE; // nothing on the calendar
}
//最早執行定時任務的deadline作為 select的阻塞時間,意思是到了定時任務的執行時間
//不管有無IO就緒事件,必須喚醒selector,從而使reactor線程執行定時任務
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//再次檢查普通任務隊列中是否有非同步任務
//沒有的話開始select阻塞輪詢IO就緒事件
strategy = select(curDeadlineNanos);
}
} finally {
// 執行到這裡說明Reactor已經從Selector上被喚醒了
// 設置Reactor的狀態為蘇醒狀態AWAKE
// lazySet優化不必要的volatile操作,不使用記憶體屏障,不保證寫操作的可見性(單線程不需要保證)
nextWakeupNanos.lazySet(AWAKE);
}
流程走到這裡,說明現在Reactor
上沒有任何事情可做,可以安心的阻塞
在Selector
上等待IO就緒事件
到來。
那麼Reactor線程
到底應該在Selector
上阻塞多久呢??
在回答這個問題之前,我們在回顧下《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中在講述Reactor的創建
時提到,Reactor線程
除了要輪詢Channel
上的IO就緒事件
,以及處理IO就緒事件
外,還有一個任務就是負責執行Netty框架中的非同步任務
。
而Netty框架中的非同步任務
分為三類:
-
存放在普通任務隊列
taskQueue
中的普通非同步任務。 -
存放在尾部隊列
tailTasks
中的用於執行統計任務等收尾動作的尾部任務。 -
還有一種就是這裡即將提到的
定時任務
。存放在Reactor
中的定時任務隊列scheduledTaskQueue
中。
從ReactorNioEventLoop類
中的繼承結構我們也可以看出,Reactor
具備執行定時任務的能力。
既然Reactor
需要執行定時任務,那麼它就不能一直阻塞
在Selector
上無限等待IO就緒事件
。
那麼我們回到本小節一開始提到的問題上,為了保證Reactor
能夠及時地執行定時任務
,Reactor線程
需要在即將要執行的的第一個定時任務deadline
到達之前被喚醒。
所以在Reactor
線程開始輪詢IO就緒事件
之前,我們需要首先計算出來Reactor線程
在Selector
上的阻塞超時時間。
1.2.1 Reactor的輪詢超時時間
首先我們需要從Reactor
的定時任務隊列scheduledTaskQueue
中取出即將快要執行的定時任務deadline
。將這個deadline
作為Reactor線程
在Selector
上輪詢的超時時間。這樣可以保證在定時任務即將要執行時,Reactor現在可以及時的從Selector上被喚醒。
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// -1代表當前定時任務隊列中沒有定時任務
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected final long nextScheduledTaskDeadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}
}
nextScheduledTaskDeadlineNanos
方法會返回當前Reactor
定時任務隊列中最近的一個定時任務deadline
時間點,如果定時任務隊列中沒有定時任務,則返回-1
。
NioEventLoop
中nextWakeupNanos
變數用來存放Reactor從Selector
上被喚醒的時間點,設置為最近需要被執行定時任務的deadline
,如果當前並沒有定時任務需要執行,那麼就設置為Long.MAX_VALUE
一直阻塞,直到有IO就緒事件
到達或者有非同步任務
需要執行。
1.2.2 Reactor開始輪詢IO就緒事件
if (!hasTasks()) {
//再次檢查普通任務隊列中是否有非同步任務, 沒有的話 開始select阻塞輪詢IO就緒事件
strategy = select(curDeadlineNanos);
}
在Reactor線程
開始阻塞
輪詢IO就緒事件
之前還需要再次檢查一下是否有非同步任務
需要執行。
如果此時恰巧有非同步任務
提交,就需要停止IO就緒事件
的輪詢,轉去執行非同步任務
。如果沒有非同步任務
,則正式開始輪詢IO就緒事件
。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//無定時任務,無普通任務執行時,開始輪詢IO就緒事件,沒有就一直阻塞 直到喚醒條件成立
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
如果deadlineNanos == NONE
,經過上小節的介紹,我們知道NONE
表示當前Reactor
中並沒有定時任務,所以可以安心的阻塞
在Selector
上等待IO就緒事件
到來。
selector.select()
調用是一個阻塞調用,如果沒有IO就緒事件
,Reactor線程
就會一直阻塞在這裡直到IO就緒事件
到來。這裡占時不考慮前邊提到的JDK NIO Epoll的空輪詢BUG
.
讀到這裡那麼問題來了,此時Reactor線程
正阻塞在selector.select()
調用上等待IO就緒事件
的到來,如果此時正好有非同步任務
被提交到Reactor
中需要執行,並且此時無任何IO就緒事件
,而Reactor線程
由於沒有IO就緒事件
到來,會繼續在這裡阻塞,那麼如何去執行非同步任務
呢??
解鈴還須系鈴人,既然非同步任務
在被提交後希望立馬得到執行,那麼就在提交非同步任務
的時候去喚醒Reactor線程
。
//addTaskWakesUp = true 表示 當且僅當只有調用addTask方法時 才會喚醒Reactor線程
//addTaskWakesUp = false 表示 並不是只有addTask方法才能喚醒Reactor 還有其他方法可以喚醒Reactor 預設設置false
private final boolean addTaskWakesUp;
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//如果當前線程不是Reactor線程,則啟動Reactor線程
//這裡可以看出Reactor線程的啟動是通過 向NioEventLoop添加非同步任務時啟動的
startThread();
.....................省略...................
}
if (!addTaskWakesUp && immediate) {
//io.netty.channel.nio.NioEventLoop.wakeup
wakeup(inEventLoop);
}
}
對於execute方法
我想大家一定不會陌生,在上篇文章《詳細圖解Netty Reactor啟動全流程》中我們在介紹Reactor線程的啟動
時介紹過該方法。
在啟動過程中涉及到的重要操作Register操作
,Bind操作
都需要封裝成非同步任務
通過該方法提交到Reactor
中執行。
這裡我們將重點放在execute方法
後半段wakeup
邏輯部分。
我們先介紹下和wakeup
邏輯相關的兩個參數boolean immediate
和boolean addTaskWakesUp
。
-
immediate:
表示提交的task
是否需要被立即執行。Netty中只要你提交的任務類型不是LazyRunnable
類型的任務,都是需要立即執行的。immediate = true
-
addTaskWakesUp :
true
表示當且僅當只有
調用addTask
方法時才會喚醒Reactor線程
。調用別的方法並不會喚醒Reactor線程
。
在初始化NioEventLoop
時會設置為false
,表示並不是只有
addTask方法才能喚醒Reactor線程
還有其他方法可以喚醒Reactor線程
,比如這裡的execute方法
就會喚醒Reactor線程
。
針對execute方法中的這個喚醒條件!addTaskWakesUp && immediate
,netty這裡要表達的語義是:當immediate參數為true的時候表示該非同步任務需要立即執行,addTaskWakesUp 預設設置為false 表示不僅只有addTask方法可以喚醒Reactor,還有其他方法比如這裡的execute方法也可以喚醒。但是當設置為true時,語義就變為只有addTask才可以喚醒Reactor,即使execute方法里的immediate = true也不能喚醒Reactor,因為執行的是execute方法而不是addTask方法。
private static final long AWAKE = -1L;
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
//將Reactor線程從Selector上喚醒
selector.wakeup();
}
}
當nextWakeupNanos = AWAKE
時表示當前Reactor正處於蘇醒狀態,既然是蘇醒狀態也就沒有必要去執行 selector.wakeup()
重覆喚醒Reactor了,同時也能省去這一次的系統調用開銷。
在《1.2小節 輪詢邏輯》開始介紹的源碼實現框架里Reactor被喚醒之後執行代碼會進入finally{...}
語句塊中,在那裡會將nextWakeupNanos
設置為AWAKE
。
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// 執行到這裡說明Reactor已經從Selector上被喚醒了
// 設置Reactor的狀態為蘇醒狀態AWAKE
// lazySet優化不必要的volatile操作,不使用記憶體屏障,不保證寫操作的可見性(單線程不需要保證)
nextWakeupNanos.lazySet(AWAKE);
}
這裡Netty用了一個
AtomicLong類型
的變數nextWakeupNanos
,既能表示當前Reactor線程
的狀態,又能表示Reactor線程
的阻塞超時時間。我們在日常開發中也可以學習下這種技巧。
我們繼續回到Reactor線程
輪詢IO就緒事件
的主線上。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//無定時任務,無普通任務執行時,開始輪詢IO就緒事件,沒有就一直阻塞 直到喚醒條件成立
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
當deadlineNanos
不為NONE
,表示此時Reactor
有定時任務
需要執行,Reactor線程
需要阻塞在Selector
上等待IO就緒事件
直到最近的一個定時任務執行時間點deadline
到達。
這裡的deadlineNanos
表示的就是Reactor
中最近的一個定時任務執行時間點deadline
,單位是納秒
。指的是一個絕對時間
。
而我們需要計算的是Reactor線程
阻塞在Selector
的超時時間timeoutMillis
,單位是毫秒
,指的是一個相對時間
。
所以在Reactor線程
開始阻塞在Selector
上之前,我們需要將這個單位為納秒
的絕對時間deadlineNanos
轉化為單位為毫秒
的相對時間timeoutMillis
。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
//無定時任務,無普通任務執行時,開始輪詢IO就緒事件,沒有就一直阻塞 直到喚醒條件成立
return selector.select();
}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
這裡大家可能會好奇,通過deadlineToDelayNanos方法
計算timeoutMillis
的時候,為什麼要給deadlineNanos
在加上0.995毫秒
呢??
大家想象一下這樣的場景,當最近的一個定時任務的deadline
即將在5微秒
內到達,那麼這時將納秒轉換成毫秒計算出的timeoutMillis
會是0
。
而在Netty中timeoutMillis = 0
要表達的語義是:定時任務執行時間已經到達deadline
時間點,需要被執行。
而現實情況是定時任務還有5微秒
才能夠到達deadline
,所以對於這種情況,需要在deadlineNanos
在加上0.995毫秒
湊成1毫秒
不能讓其為0。
所以從這裡我們可以看出,
Reactor
在有定時任務的情況下,至少要阻塞1毫秒
。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
protected static long deadlineToDelayNanos(long deadlineNanos) {
return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
}
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
static long deadlineToDelayNanos(long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
}
//啟動時間點
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
static long deadlineNanos(long delay) {
//計算定時任務執行deadline 去除啟動時間
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
}
這裡需要註意一下,在創建定時任務時會通過deadlineNanos方法
計算定時任務的執行deadline
,deadline
的計算邏輯是當前時間點
+任務延時delay
-系統啟動時間
。這裡需要扣除系統啟動的時間。
所以這裡在通過deadline
計算延時delay
(也就是timeout)的時候需要在加上系統啟動的時間
: deadlineNanos - nanoTime()
當通過deadlineToDelayNanos
計算出的timeoutMillis <= 0
時,表示Reactor
目前有臨近的定時任務
需要執行,這時候就需要立馬返回,不能阻塞在Selector
上影響定時任務
的執行。當然在返回執行定時任務
前,需要在順手通過selector.selectNow()
非阻塞輪詢一下Channel
上是否有IO就緒事件
到達,防止耽誤IO事件
的處理。真是操碎了心~~
當timeoutMillis > 0
時,Reactor線程
就可以安心的阻塞在Selector
上等待IO事件
的到來,直到timeoutMillis
超時時間到達。
timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)
當註冊在Reactor
上的Channel
中有IO事件
到來時,Reactor線程
就會從selector.select(timeoutMillis)
調用中喚醒,立即去處理IO就緒事件
。
這裡假設一種極端情況,如果最近的一個定時任務的deadline是在未來很遠的一個時間點,這樣就會使timeoutMillis的時間非常非常久,那麼Reactor豈不是會一直阻塞在Selector上造成 Netty 無法工作?
筆者覺得大家現在心裡應該已經有了答案,我們在《1.2.2 Reactor開始輪詢IO就緒事件》小節一開始介紹過,當Reactor正在Selector上阻塞時,如果此時用戶線程向Reactor提交了非同步任務,Reactor線程會通過execute方法被喚醒。
流程到這裡,Reactor中最重要也是最核心的邏輯:輪詢Channel
上的IO就緒事件
的處理流程我們就講解完了。
當Reactor輪詢到有IO活躍事件或者有非同步任務需要執行時,就會從Selector上被喚醒,下麵就到了該介紹Reactor被喚醒之後是如何處理IO就緒事件
以及如何執行非同步任務
的時候了。
Netty畢竟是一個網路框架,所以它會優先去處理Channel
上的IO事件
,基於這個事實,所以Netty不會容忍非同步任務
被無限制的執行從而影響IO吞吐
。
Netty通過ioRatio變數
來調配Reactor線程
在處理IO事件
和執行非同步任務
之間的CPU時間
分配比例。
下麵我們就來看下這個執行時間比例的分配邏輯是什麼樣的~~~
2. Reactor處理IO與處理非同步任務的時間比例分配
無論什麼時候,當有IO就緒事件
到來時,Reactor
都需要保證IO事件
被及時完整的處理完,而ioRatio
主要限制的是執行非同步任務
所需用時,防止Reactor線程
處理非同步任務
時間過長而導致 I/O 事件
得不到及時地處理。
//調整Reactor線程執行IO事件和執行非同步任務的CPU時間比例 預設50,表示執行IO事件和非同步任務的時間比例是一比一
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { //先一股腦執行IO事件,在一股腦執行非同步任務(無時間限制)
try {
if (strategy > 0) {
//如果有IO就緒事件 則處理IO就緒事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
//處理所有非同步任務
ranTasks = runAllTasks();
}
} else if (strategy > 0) {//先執行IO事件 用時ioTime 執行非同步任務只能用時ioTime * (100 - ioRatio) / ioRatio
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 限定在超時時間內 處理有限的非同步任務 防止Reactor線程處理非同步任務時間過長而導致 I/O 事件阻塞
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else { //沒有IO就緒事件處理,則只執行非同步任務 最多執行64個 防止Reactor線程處理非同步任務時間過長而導致 I/O 事件阻塞
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
- 當
ioRatio = 100
時,表示無需考慮執行時間的限制,當有IO就緒事件
時(strategy > 0
)Reactor線程
需要優先處理IO就緒事件
,處理完IO事件
後,執行所有的非同步任務
包括:普通任務,尾部任務,定時任務。無時間限制。
strategy
的數值表示IO就緒
的Channel
個數。它是前邊介紹的io.netty.channel.nio.NioEventLoop#select
方法的返回值。
- 當
ioRatio
設置的值不為100
時,預設為50
。需要先統計出執行IO事件
的用時ioTime
,根據ioTime * (100 - ioRatio) / ioRatio
計算出,後面執行非同步任務
的限制時間。也就是說Reactor線程
需要在這個限定的時間內,執行有限的非同步任務,防止Reactor線程
由於處理非同步任務
時間過長而導致I/O 事件
得不到及時地處理。
預設情況下,執行
IO事件
用時和執行非同步任務
用時比例設置的是一比一。
ioRatio
設置的越高,則Reactor線程
執行非同步任務的時間占比越小
。
要想得到Reactor線程
執行非同步任務
所需的時間限制,必須知道執行IO事件
的用時ioTime
然後在根據ioRatio
計算出執行非同步任務
的時間限制。
那如果此時並沒有IO就緒事件
需要Reactor線程
處理的話,這種情況下我們無法得到ioTime
,那怎麼得到執行非同步任務
的限制時間呢??
在這種特殊情況下,Netty只允許Reactor線程
最多執行64
個非同步任務,然後就結束執行。轉去繼續輪訓IO就緒事件
。核心目的還是防止Reactor線程
由於處理非同步任務
時間過長而導致I/O 事件
得不到及時地處理。
預設情況下,當
Reactor
有非同步任務
需要處理但是沒有IO就緒事件
時,Netty只會允許Reactor線程
執行最多64
個非同步任務。
現在我們對Reactor
處理IO事件
和非同步任務
的整體框架已經瞭解了,下麵我們就來分別介紹下Reactor線程
在處理IO事件
和非同步任務
的具體邏輯是什麼樣的?
3. Reactor線程處理IO就緒事件
//該欄位為持有selector對象selectedKeys的引用,當IO事件就緒時,直接從這裡獲取
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
//是否採用netty優化後的selectedKey集合類型 是由變數DISABLE_KEY_SET_OPTIMIZATION決定的 預設為false
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
看到這段代碼大家眼熟嗎??
不知大家還記不記得我們在《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中介紹Reactor NioEventLoop類
在創建Selector
的過程中提到,出於對JDK NIO Selector
中selectedKeys 集合
的插入
和遍歷
操作性能的考慮Netty將自己用數組實現的SelectedSelectionKeySet 集合
替換掉了JDK NIO Selector
中selectedKeys
的HashSet
實現。
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO就緒的SelectionKey(裡面包裹著channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//註冊在該Selector上的所有SelectionKey(裡面包裹著channel)
protected HashSet<SelectionKey> keys;
...............省略...................
}
Netty中通過優化開關DISABLE_KEY_SET_OPTIMIZATION
控制是否對JDK NIO Selector
進行優化。預設是需要優化。
在優化開關開啟的情況下,Netty會將創建的SelectedSelectionKeySet 集合
保存在NioEventLoop
的private SelectedSelectionKeySet selectedKeys
欄位中,方便Reactor線程
直接從這裡獲取IO就緒
的SelectionKey
。
在優化開關關閉的情況下,Netty會直接採用JDK NIO Selector
的預設實現。此時NioEventLoop
的selectedKeys
欄位就會為null
。
忘記這段的同學可以在回顧下《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中關於
Reactor
的創建過程。
經過對前邊內容的回顧,我們看到了在Reactor
處理IO就緒事件
的邏輯也分為兩個部分,一個是經過Netty優化的,一個是採用JDK 原生
的。
我們先來看採用JDK 原生
的Selector
的處理方式,理解了這種方式,在看Netty優化的方式會更加容易。
3.1 processSelectedKeysPlain
我們在《聊聊Netty那些事兒之Reactor在Netty中的實現(創建篇)》一文中介紹JDK NIO Selector
的工作過程時講過,當註冊在Selector
上的Channel
發生IO就緒事件
時,Selector
會將IO就緒
的SelectionKey
插入到Set<SelectionKey> selectedKeys
集合中。
這時Reactor線程
會從java.nio.channels.Selector#select(long)
調用中返回。隨後調用java.nio.channels.Selector#selectedKeys
獲取IO就緒
的SelectionKey
集合。
所以Reactor線程
在調用processSelectedKeysPlain方法
處理IO就緒事件
之前需要調用selector.selectedKeys()
去獲取所有IO就緒
的SelectionKeys
。
processSelectedKeysPlain(selector.selectedKeys())
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
//註意每次迭代末尾的keyIterator.remove()調用。Selector不會自己從已選擇鍵集中移除SelectionKey實例。
//必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
//目的是再次進入for迴圈 移除失效的selectKey(socketChannel可能從selector上移除)
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
3.1.1 獲取IO就緒的Channel
Set<SelectionKey> selectedKeys
集合裡面裝的全部是IO就緒
的SelectionKey
,註意,此時Set<SelectionKey> selectedKeys
的實現類型為HashSet類型
。因為我們這裡首先介紹的是JDK NIO 原生實現。
通過獲取HashSet
的迭代器,開始逐個處理IO就緒
的Channel
。
Iterator<SelectionKey> i = selectedKeys.iterator();
final SelectionKey k = i.next();
final Object a = k.attachment();
大家還記得這個SelectionKey
中的attachment屬性
里存放的是什麼嗎??
在上篇文章《詳細圖解Netty Reactor啟動全流程》中我們在講NioServerSocketChannel
向Main Reactor
註冊的時候,通過this指針將自己作為SelectionKey
的attachment屬性
註冊到Selector
中。這一步完成了Netty自定義Channel
和JDK NIO Channel
的綁定。
public abstract class AbstractNioChannel extends AbstractChannel {
//channel註冊到Selector後獲得的SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略....................
}
}
}
}
而我們也提到SelectionKey
就相當於是Channel
在Selector
中的一種表示,當Channel
上有IO就緒事件
時,Selector
會將Channel
對應的SelectionKey
返回給Reactor線程
,我們可以通過返回的這個SelectionKey
里的attachment屬性
獲取到對應的Netty自定義Channel
。
對於客戶端連接事件(
OP_ACCEPT
)活躍時,這裡的Channel類型
為NioServerSocketChannel
。
對於客戶端讀寫事件(Read
,Write
)活躍時,這裡的Channel類型
為NioSocketChannel
。
當我們通過k.attachment()
獲取到Netty自定義的Channel
時,就需要把這個Channel
對應的SelectionKey
從Selector
的就緒集合Set<SelectionKey> selectedKeys