本文介紹了Netty對各種IO模型的支持以及如何輕鬆切換各種IO模型。還花了大量的篇幅介紹Netty服務端的核心引擎主從Reactor線程組的創建過程。在這個過程中,我們還提到了Netty對各種細節進行的優化,展現了Netty對性能極致的追求。 ...
本系列Netty源碼解析文章基於 4.1.56.Final版本
在上篇文章《聊聊Netty那些事兒之從內核角度看IO模型》中我們花了大量的篇幅來從內核角度詳細講述了五種IO模型
的演進過程以及ReactorIO線程模型
的底層基石IO多路復用技術在內核中的實現原理。
最後我們引出了netty中使用的主從Reactor IO線程模型。
通過上篇文章的介紹,我們已經清楚了在IO調用的過程中內核幫我們搞了哪些事情,那麼俗話說的好內核領進門,修行在netty
,netty在用戶空間又幫我們搞了哪些事情?
那麼從本文開始,筆者將從源碼角度來帶大家看下上圖中的Reactor IO線程模型
在Netty中是如何實現的。
本文作為Reactor在Netty中實現系列文章中的開篇文章,筆者先來為大家介紹Reactor的骨架是如何創建出來的。
在上篇文章中我們提到Netty採用的是主從Reactor多線程
的模型,但是它在實現上又與Doug Lea在Scalable IO in Java論文中提到的經典主從Reactor多線程模型
有所差異。
Netty中的Reactor
是以Group
的形式出現的,主從Reactor
在Netty中就是主從Reactor組
,每個Reactor Group
中會有多個Reactor
用來執行具體的IO任務
。當然在netty中Reactor
不只用來執行IO任務
,這個我們後面再說。
Main Reactor Group
中的Reactor
數量取決於服務端要監聽的埠個數,通常我們的服務端程式只會監聽一個埠,所以Main Reactor Group
只會有一個Main Reactor
線程來處理最重要的事情:綁定埠地址
,接收客戶端連接
,為客戶端創建對應的SocketChannel
,將客戶端SocketChannel分配給一個固定的Sub Reactor
。也就是上篇文章筆者為大家舉的例子,飯店最重要的工作就是先把客人迎接進來。“我家大門常打開,開放懷抱等你,擁抱過就有了默契你會愛上這裡......”
Sub Reactor Group
里有多個Reactor
線程,Reactor
線程的個數可以通過系統參數-D io.netty.eventLoopThreads
指定。預設的Reactor
的個數為CPU核數 * 2
。Sub Reactor
線程主要用來輪詢客戶端SocketChannel上的IO就緒事件
,處理IO就緒事件
,執行非同步任務
。Sub Reactor Group
做的事情就是上篇飯店例子中服務員的工作,客人進來了要為客人分配座位,端茶送水,做菜上菜。“不管遠近都是客人,請不用客氣,相約好了在一起,我們歡迎您......”
一個
客戶端SocketChannel
只能分配給一個固定的Sub Reactor
。一個Sub Reactor
負責處理多個客戶端SocketChannel
,這樣可以將服務端承載的全量客戶端連接
分攤到多個Sub Reactor
中處理,同時也能保證客戶端SocketChannel上的IO處理的線程安全性
。
由於文章篇幅的關係,作為Reactor在netty中實現的第一篇我們主要來介紹主從Reactor Group
的創建流程,骨架脈絡先搭好。
下麵我們來看一段Netty服務端代碼的編寫模板,從代碼模板的流程中我們來解析下主從Reactor的創建流程以及在這個過程中所涉及到的Netty核心類。
Netty服務端代碼模板
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.option(ChannelOption.SO_BACKLOG, 100)//設置主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設置從Reactor中註冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定埠啟動服務,開始監聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 首先我們要創建Netty最核心的部分 ->
創建主從Reactor Group
,在Netty中EventLoopGroup
就是Reactor Group
的實現類。對應的EventLoop
就是Reactor
的實現類。
//創建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- 創建用於
IO處理
的ChannelHandler
,實現相應IO事件
的回調函數,編寫對應的IO處理
邏輯。註意這裡只是簡單示例哈,詳細的IO事件處理,筆者會單獨開一篇文章專門講述。
final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
................省略IO處理邏輯................
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
-
創建
ServerBootstrap
Netty服務端啟動類,併在啟動類中配置啟動Netty服務端所需要的一些必備信息。-
通過
serverBootstrap.group(bossGroup, workerGroup)
為Netty服務端配置主從Reactor Group
實例。 -
通過
serverBootstrap.channel(NioServerSocketChannel.class)
配置Netty服務端的ServerSocketChannel
用於綁定埠地址
以及創建客戶端SocketChannel
。Netty中的NioServerSocketChannel.class
就是對JDK NIO中ServerSocketChannel
的封裝。而用於表示客戶端連接
的NioSocketChannel
是對JDK NIOSocketChannel
封裝。
在上篇文章介紹
Socket內核結構
小節中我們提到,在編寫服務端網路程式時,我們首先要創建一個Socket
用於listen和bind
埠地址,我們把這個叫做監聽Socket
,這裡對應的就是NioServerSocketChannel.class
。當客戶端連接完成三次握手,系統調用accept
函數會基於監聽Socket
創建出來一個新的Socket
專門用於與客戶端之間的網路通信我們稱為客戶端連接Socket
,這裡對應的就是NioSocketChannel.class
-
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)
設置服務端ServerSocketChannel
中的SocketOption
。關於SocketOption
的選項我們後邊的文章再聊,本文主要聚焦在NettyMain Reactor Group
的創建及工作流程。 -
serverBootstrap.handler(....)
設置服務端NioServerSocketChannel
中對應Pipieline
中的ChannelHandler
。
netty有兩種
Channel類型
:一種是服務端用於監聽綁定埠地址的NioServerSocketChannel
,一種是用於客戶端通信的NioSocketChannel
。每種Channel類型實例
都會對應一個PipeLine
用於編排對應channel實例
上的IO事件處理邏輯。PipeLine
中組織的就是ChannelHandler
用於編寫特定的IO處理邏輯。註意
serverBootstrap.handler
設置的是服務端NioServerSocketChannel PipeLine
中的ChannelHandler
。serverBootstrap.childHandler(ChannelHandler childHandler)
用於設置客戶端NioSocketChannel
中對應Pipieline
中的ChannelHandler
。我們通常配置的編碼解碼器就是在這裡。
ServerBootstrap
啟動類方法帶有child
首碼的均是設置客戶端NioSocketChannel
屬性的。ChannelInitializer
是用於當SocketChannel
成功註冊到綁定的Reactor
上後,用於初始化該SocketChannel
的Pipeline
。它的initChannel
方法會在註冊成功後執行。這裡只是捎帶提一下,讓大家有個初步印象,後面我會專門介紹。 -
-
ChannelFuture f = serverBootstrap.bind(PORT).sync()
這一步會是下篇文章要重點分析的主題Main Reactor Group
的啟動,綁定埠地址,開始監聽客戶端連接事件(OP_ACCEPT
)。本文我們只關註創建流程。 -
f.channel().closeFuture().sync()
等待服務端NioServerSocketChannel
關閉。Netty服務端到這裡正式啟動,並準備好接受客戶端連接的準備。 -
shutdownGracefully
優雅關閉主從Reactor線程組
里的所有Reactor線程
。
Netty對IO模型的支持
在上篇文章中我們介紹了五種IO模型
,Netty中支持BIO
,NIO
,AIO
以及多種操作系統下的IO多路復用技術
實現。
在Netty中切換這幾種IO模型
也是非常的方便,下麵我們來看下Netty如何對這幾種IO模型進行支持的。
首先我們介紹下幾個與IO模型
相關的重要介面:
EventLoop
EventLoop
就是Netty中的Reactor
,可以說它就是Netty的引擎,負責Channel上IO就緒事件的監聽
,IO就緒事件的處理
,非同步任務的執行
驅動著整個Netty的運轉。
不同IO模型
下,EventLoop
有著不同的實現,我們只需要切換不同的實現類就可以完成對NettyIO模型
的切換。
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在NIO模型
下Netty會自動
根據操作系統以及版本的不同選擇對應的IO多路復用技術實現
。比如Linux 2.6版本以上用的是Epoll
,2.6版本以下用的是Poll
,Mac下採用的是Kqueue
。
其中Linux kernel 在5.1版本引入的非同步IO庫io_uring正在netty中孵化。
EventLoopGroup
Netty中的Reactor
是以Group
的形式出現的,EventLoopGroup
正是Reactor組
的介面定義,負責管理Reactor
,Netty中的Channel
就是通過EventLoopGroup
註冊到具體的Reactor
上的。
Netty的IO線程模型是主從Reactor多線程模型
,主從Reactor線程組
在Netty源碼中對應的其實就是兩個EventLoopGroup
實例。
不同的IO模型
也有對應的實現:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
ServerSocketChannel
用於Netty服務端使用的ServerSocketChannel
,對應於上篇文章提到的監聽Socket
,負責綁定監聽埠地址,接收客戶端連接並創建用於與客戶端通信的SocketChannel
。
不同的IO模型
下的實現:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
SocketChannel
用於與客戶端通信的SocketChannel
,對應於上篇文章提到的客戶端連接Socket
,當客戶端完成三次握手後,由系統調用accept
函數根據監聽Socket
創建。
不同的IO模型
下的實現:
BIO | NIO | AIO |
---|---|---|
OioSocketChannel | NioSocketChannel | AioSocketChannel |
我們看到在不同IO模型
的實現中,Netty這些圍繞IO模型
的核心類只是首碼的不同:
- BIO對應的首碼為
Oio
表示old io
,現在已經廢棄不推薦使用。 - NIO對應的首碼為
Nio
,正是Netty推薦也是我們常用的非阻塞IO模型
。 - AIO對應的首碼為
Aio
,由於Linux下的非同步IO
機制實現的並不成熟,性能提升表現上也不明顯,現已被刪除。
我們只需要將IO模型
的這些核心介面對應的實現類首碼
改為對應IO模型
的首碼,就可以輕鬆在Netty中完成對IO模型
的切換。
多種NIO的實現
Common | Linux | Mac |
---|---|---|
NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
NioEventLoop | EpollEventLoop | KQueueEventLoop |
NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
我們通常在使用NIO模型
的時候會使用Common列
下的這些IO模型
核心類,Common類
也會根據操作系統的不同自動選擇JDK
在對應平臺下的IO多路復用技術
的實現。
而Netty自身也根據操作系統的不同提供了自己對IO多路復用技術
的實現,比JDK
的實現性能更優。比如:
JDK
的 NIO預設
實現是水平觸發
,Netty 是邊緣觸發(預設)
和水平觸發可切換。。- Netty 實現的垃圾回收更少、性能更好。
我們編寫Netty服務端程式的時候也可以根據操作系統的不同,採用Netty自身的實現來進一步優化程式。做法也很簡單,直接將上圖中紅框里的實現類替換成Netty的自身實現類即可完成切換。
經過以上對Netty服務端代碼編寫模板以及IO模型
相關核心類的簡單介紹,我們對Netty的創建流程有了一個簡單粗略的總體認識,下麵我們來深入剖析下創建流程過程中的每一個步驟以及這個過程中涉及到的核心類實現。
以下源碼解析部分我們均採用Common列
下NIO
相關的實現進行解析。
創建主從Reactor線程組
在Netty服務端程式編寫模板的開始,我們首先會創建兩個Reactor線程組:
-
一個是主Reactor線程組
bossGroup
用於監聽客戶端連接,創建客戶端連接NioSocketChannel
,並將創建好的客戶端連接NioSocketChannel
註冊到從Reactor線程組中一個固定的Reactor
上。 -
一個是從Reactor線程組
workerGroup
,workerGroup
中的Reactor
負責監聽綁定在其上的客戶端連接NioSocketChannel
上的IO就緒事件
,並處理IO就緒事件
,執行非同步任務
。
//創建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor線程組的實現類為NioEventLoopGroup
,在創建bossGroup
和workerGroup
的時候用到了NioEventLoopGroup
的兩個構造函數:
- 帶
nThreads
參數的構造函數public NioEventLoopGroup(int nThreads)
。 - 不帶
nThreads
參數的預設
構造函數public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......................省略...........................
}
nThreads
參數表示當前要創建的Reactor線程組
內包含多少個Reactor線程
。不指定nThreads
參數的話採用預設的Reactor線程
個數,用0
表示。
最終會調用到構造函數
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
下麵簡單介紹下構造函數中這幾個參數的作用,後面我們在講解本文主線的過程中還會提及這幾個參數,到時在詳細介紹,這裡只是讓大家有個初步印象,不必做過多的糾纏。
Executor executor:
負責啟動Reactor線程
進而Reactor才可以開始工作。
Reactor線程組
NioEventLoopGroup
負責創建Reactor線程
,在創建的時候會將executor
傳入。
-
RejectedExecutionHandler:
當向Reactor
添加非同步任務添加失敗時,採用的拒絕策略。Reactor的任務不只是監聽IO活躍事件和IO任務的處理,還包括對非同步任務的處理。這裡大家只需有個這樣的概念,後面筆者會專門詳細介紹。 -
SelectorProvider selectorProvider:
Reactor中的IO模型為IO多路復用模型
,對應於JDK NIO中的實現為java.nio.channels.Selector
(就是我們上篇文章中提到的select,poll,epoll
),每個Reator中都包含一個Selector
,用於輪詢
註冊在該Reactor上的所有Channel
上的IO事件
。SelectorProvider
就是用來創建Selector
的。 -
SelectStrategyFactory selectStrategyFactory:
Reactor最重要的事情就是輪詢
註冊其上的Channel
上的IO就緒事件
,這裡的SelectStrategyFactory
用於指定輪詢策略
,預設為DefaultSelectStrategyFactory.INSTANCE
。
最終會將這些參數交給NioEventLoopGroup
的父類構造器,下麵我們來看下NioEventLoopGroup類
的繼承結構:
NioEventLoopGroup類
的繼承結構乍一看比較複雜,大家不要慌,筆者會隨著主線的深入慢慢地介紹這些父類介面,我們現在重點關註Mutithread
首碼的類。
我們知道NioEventLoopGroup
是Netty中的Reactor線程組
的實現,既然是線程組那麼肯定是負責管理和創建多個Reactor線程的
,所以Mutithread
首碼的類定義的行為自然是對Reactor線程組
內多個Reactor線程
的創建和管理工作。
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
//預設Reactor個數
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...................省略.....................
}
MultithreadEventLoopGroup類
主要的功能就是用來確定Reactor線程組
內Reactor
的個數。
預設的Reactor
的個數存放於欄位DEFAULT_EVENT_LOOP_THREADS
中。
從static {}
靜態代碼塊中我們可以看出預設Reactor
的個數的獲取邏輯:
-
可以通過系統變數
-D io.netty.eventLoopThreads"
指定。 -
如果不指定,那麼預設的就是
NettyRuntime.availableProcessors() * 2
當nThread
參數設置為0
採用預設設置時,Reactor線程組
內的Reactor
個數則設置為DEFAULT_EVENT_LOOP_THREADS
。
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup
這裡就是本小節的核心,主要用來定義創建和管理Reactor
的行為。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//Reactor線程組中的Reactor集合
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
//從Reactor group中選擇一個特定的Reactor的選擇策略 用於channel註冊綁定到一個固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
............................省略................................
}
首先介紹一個新的構造器參數EventExecutorChooserFactory chooserFactory
。當客戶端連接完成三次握手後,Main Reactor
會創建客戶端連接NioSocketChannel
,並將其綁定到Sub Reactor Group
中的一個固定Reactor
,那麼具體要綁定到哪個具體的Sub Reactor
上呢?這個綁定策略就是由chooserFactory
來創建的。預設為DefaultEventExecutorChooserFactory
。
下麵就是本小節的主題Reactor線程組
的創建過程:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//用於創建Reactor線程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
//迴圈創建reaactor group中的Reactor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//創建reactor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
................省略................
}
}
}
//創建channel到Reactor的綁定策略
chooser = chooserFactory.newChooser(children);
................省略................
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
1. 創建用於啟動Reactor線程的executor
在Netty Reactor Group中的單個Reactor
的IO線程模型
為上篇文章提到的單Reactor單線程模型
,一個Reactor線程
負責輪詢
註冊其上的所有Channel
中的IO就緒事件
,處理IO事件,執行Netty中的非同步任務等工作。正是這個Reactor線程
驅動著整個Netty的運轉,可謂是Netty的核心引擎。
而這裡的executor
就是負責啟動Reactor線程
的,從創建源碼中我們可以看到executor
的類型為ThreadPerTaskExecutor
。
ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我們看到ThreadPerTaskExecutor
做的事情很簡單,從它的命名首碼ThreadPerTask
我們就可以猜出它的工作方式,就是來一個任務就創建一個線程執行。而創建的這個線程正是netty的核心引擎Reactor線程。
在Reactor線程
啟動的時候,Netty會將Reactor線程
要做的事情封裝成Runnable
,丟給exexutor
啟動。
而Reactor線程
的核心就是一個死迴圈
不停的輪詢
IO就緒事件,處理IO事件,執行非同步任務。一刻也不停歇,堪稱996典範
。
這裡向大家先賣個關子,"Reactor線程是何時啟動的呢??"
2. 創建Reactor
Reactor線程組NioEventLoopGroup
包含多個Reactor
,存放於private final EventExecutor[] children
數組中。
所以下麵的事情就是創建nThread
個Reactor
,並存放於EventExecutor[] children
欄位中,
我們來看下用於創建Reactor
的newChild(executor, args)
方法:
newChild
newChild
方法是MultithreadEventExecutorGroup
中的一個抽象方法,提供給具體子類實現。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
這裡我們解析的是NioEventLoopGroup
,我們來看下newChild
在該類中的實現:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
前邊提到的眾多構造器參數,這裡會通過可變參數Object... args
傳入到Reactor類NioEventLoop
的構造器中。
這裡介紹下新的參數EventLoopTaskQueueFactory queueFactory
,前邊提到Netty中的Reactor
主要工作是輪詢
註冊其上的所有Channel
上的IO就緒事件
,處理IO就緒事件
。除了這些主要的工作外,Netty為了極致的壓榨Reactor
的性能,還會讓它做一些非同步任務的執行工作。既然要執行非同步任務,那麼Reactor
中就需要一個隊列
來保存任務。
這裡的EventLoopTaskQueueFactory
就是用來創建這樣的一個隊列來保存Reactor
中待執行的非同步任務。
可以把Reactor
理解成為一個單線程的線程池
,類似
於JDK
中的SingleThreadExecutor
,僅用一個線程來執行輪詢IO就緒事件
,處理IO就緒事件
,執行非同步任務
。同時待執行的非同步任務保存在Reactor
里的taskQueue
中。
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
//用於創建JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector輪詢策略 決定什麼時候輪詢,什麼時候處理IO事件,什麼時候執行非同步任務
private final SelectStrategy selectStrategy;
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
}
這裡就正式開始了Reactor
的創建過程,我們知道Reactor
的核心是採用的IO多路復用模型
來對客戶端連接上的IO事件
進行監聽
,所以最重要的事情是創建Selector
(JDK NIO 中IO多路復用技術的實現
)。
可以把
Selector
理解為我們上篇文章介紹的Select,poll,epoll
,它是JDK NIO
對操作系統內核提供的這些IO多路復用技術
的封裝。
openSelector
openSelector
是NioEventLoop類
中用於創建IO多路復用
的Selector
,並對創建出來的JDK NIO
原生的Selector
進行性能優化。
首先會通過SelectorProvider#openSelector
創建JDK NIO原生的Selector
。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//通過JDK NIO SelectorProvider創建Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
..................省略.............
}
SelectorProvider
會根據操作系統的不同選擇JDK在不同操作系統版本下的對應Selector
的實現。Linux下會選擇Epoll
,Mac下會選擇Kqueue
。
下麵我們就來看下SelectorProvider
是如何做到自動適配不同操作系統下IO多路復用
實現的
SelectorProvider
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}
SelectorProvider
是在前面介紹的NioEventLoopGroup類
構造函數中通過調用SelectorProvider.provider()
被載入,並通過NioEventLoopGroup#newChild
方法中的可變長參數Object... args
傳遞到NioEventLoop
中的private final SelectorProvider provider
欄位中。
SelectorProvider的載入過程:
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
從SelectorProvider
載入源碼中我們可以看出,SelectorProvider
的載入方式有三種,優先順序如下:
- 通過系統變數
-D java.nio.channels.spi.SelectorProvider
指定SelectorProvider
的自定義實現類全限定名
。通過應用程式類載入器(Application Classloader)
載入。
private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
}
.................省略.............
}
- 通過
SPI
方式載入。在工程目錄META-INF/services
下定義名為java.nio.channels.spi.SelectorProvider
的SPI文件
,文件中第一個定義的SelectorProvider
實現類全限定名就會被載入。
private static boolean loadProviderAsService() {
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}
- 如果以上兩種方式均未被定義,那麼就採用
SelectorProvider
系統預設實現sun.nio.ch.DefaultSelectorProvider
。筆者當前使用的操作系統是MacOS
,從源碼中我們可以看到自動適配了KQueue
實現。
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}
不同操作系統中JDK對於
DefaultSelectorProvider
會有所不同,Linux內核版本2.6以上對應的Epoll
,Linux內核版本2.6以下對應的Poll
,MacOS對應的是KQueue
。
下麵我們接著回到io.netty.channel.nio.NioEventLoop#openSelector
的主線上來。
Netty對JDK NIO 原生Selector的優化
首先在NioEventLoop
中有一個Selector優化開關DISABLE_KEY_SET_OPTIMIZATION
,通過系統變數-D io.netty.noKeySetOptimization
指定,預設是開啟的,表示需要對JDK NIO原生Selector
進行優化。
public final class NioEventLoop extends SingleThreadEventLoop {
//Selector優化開關 預設開啟 為了遍歷的效率 會對Selector中的SelectedKeys進行數據結構優化
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}
如果優化開關DISABLE_KEY_SET_OPTIMIZATION
是關閉的,那麼直接返回JDK NIO原生的Selector
。
private SelectorTuple openSelector() {
..........SelectorProvider創建JDK NIO 原生Selector..............
if (DISABLE_KEY_SET_OPTIMIZATION) {
//JDK NIO原生Selector ,Selector優化開關 預設開啟需要對Selector進行優化
return new SelectorTuple(unwrappedSelector);
}
}
下麵為Netty對JDK NIO原生的Selector
的優化過程:
- 獲取
JDK NIO原生Selector
的抽象實現類sun.nio.ch.SelectorImpl
。JDK NIO原生Selector
的實現均繼承於該抽象類。用於判斷由SelectorProvider
創建出來的Selector
是否為JDK預設實現
(SelectorProvider
第三種載入方式)。因為SelectorProvider
可以是自定義載入,所以它創建出來的Selector
並不一定是JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
JDK NIO Selector的抽象類sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO就緒的SelectionKey(裡面包裹著channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//註冊在該Selector上的所有SelectionKey(裡面包裹著channel)
protected HashSet<SelectionKey> keys;
// Public views of the key sets
//用於向調用線程返回的keys,不可變
private Set<SelectionKey> publicKeys; // Immutable
//當有IO就緒的SelectionKey時,向調用線程返回。只可刪除其中元素,不可增加
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
//不可變
publicKeys = Collections.unmodifiableSet(keys);
//只可刪除其中元素,不可增加
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
這裡筆者來簡單介紹下JDK NIO中的Selector
中這幾個欄位的含義,我們可以和上篇文章講到的epoll在內核中的結構做類比,方便大家後續的理解:
Set<SelectionKey> selectedKeys
類似於我們上篇文章講解Epoll
時提到的就緒隊列eventpoll->rdllist
,Selector
這裡大家可以理解為Epoll
。Selector
會將自己監聽到的IO就緒
的Channel
放到selectedKeys
中。
這裡的
SelectionKey
暫且可以理解為Channel
在Selector
中的表示,類比上圖中epitem結構
里的epoll_event
,封裝IO就緒Socket的信息。
其實SelectionKey
里包含的信息不止是Channel
還有很多IO相關的信息。後面我們在詳細介紹。
HashSet<SelectionKey> keys:
這裡存放的是所有註冊到該Selector
上的Channel
。類比epoll中的紅黑樹結構rb_root
SelectionKey
在Channel
註冊到Selector
中後生成。
-
Set<SelectionKey> publicSelectedKeys
相當於是selectedKeys
的視圖,用於向外部線程返回IO就緒
的SelectionKey
。這個集合在外部線程中只能做刪除操作不可增加元素
,並且不是線程安全的
。 -
Set<SelectionKey> publicKeys
相當於keys
的不可變視圖,用於向外部線程返回所有註冊在該Selector
上的SelectionKey
這裡需要重點關註
抽象類sun.nio.ch.SelectorImpl
中的selectedKeys
和publicSelectedKeys
這兩個欄位,註意它們的類型都是HashSet
,一會優化的就是這裡!!!!
- 判斷由
SelectorProvider
創建出來的Selector
是否是JDK NIO原生的Selector
實現。因為Netty優化針對的是JDK NIO 原生Selector
。判斷標準為sun.nio.ch.SelectorImpl
類是否為SelectorProvider
創建出Selector
的父類。如果不是則直接返回。不在繼續下麵的優化過程。
//判斷是否可以對Selector進行優化,這裡主要針對JDK NIO原生Selector的實現類進行優化,因為SelectorProvider可以載入的是自定義Selector實現
//如果SelectorProvider創建的Selector不是JDK原生sun.nio.ch.SelectorImpl的實現類,那麼無法進行優化,直接返回
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
通過前面對SelectorProvider
的介紹我們知道,這裡通過provider.openSelector()
創建出來的Selector
實現類為KQueueSelectorImpl類
,它繼承實現了sun.nio.ch.SelectorImpl
,所以它是JDK NIO 原生的Selector
實現
class KQueueSelectorImpl extends SelectorImpl {
}
- 創建
SelectedSelectionKeySet
通過反射替換掉sun.nio.ch.SelectorImpl類
中selectedKeys
和publicSelectedKeys
的預設HashSet
實現。
為什麼要用SelectedSelectionKeySet
替換掉原來的HashSet
呢??
因為這裡涉及到對HashSet類型
的sun.nio.ch.SelectorImpl#selectedKeys
集合的兩種操作:
-
插入操作: 通過前邊對
sun.nio.ch.SelectorImpl類
中欄位的介紹我們知道,在Selector
監聽到IO就緒
的SelectionKey
後,會將IO就緒
的SelectionKey
插入sun.nio.ch.SelectorImpl#selectedKeys
集合中,這時Reactor線程
會從java.nio.channels.Selector#select(long)
阻塞調用中返回(類似上篇文章提到的epoll_wait
)。 -
遍歷操作:
Reactor線程
返回後,會從Selector
中獲取IO就緒
的SelectionKey
集合(也就是sun.nio.ch.SelectorImpl#selectedKeys
),Reactor線程
遍歷selectedKeys
,獲取IO就緒
的SocketChannel
,並處理SocketChannel
上的IO事件
。
我們都知道HashSet
底層數據結構是一個哈希表
,由於Hash衝突
這種情況的存在,所以導致對哈希表
進行插入
和遍歷
操作的性能不如對數組
進行插入
和遍歷
操作的性能好。
還有一個重要原因是,數組可以利用CPU緩存的優勢來提高遍歷的效率。後面筆者會有一篇專門的文章來講述利用CPU緩存行如何為我們帶來性能優勢。
所以Netty為了優化對sun.nio.ch.SelectorImpl#selectedKeys
集合的插入,遍歷
性能,自己用數組
這種數據結構實現了SelectedSelectionKeySet
,用它來替換原來的HashSet
實現。
SelectedSelectionKeySet
-
初始化
SelectionKey[] keys
數組大小為1024
,當數組容量不夠時,擴容為原來的兩倍大小。 -
通過數組尾部指針
size
,在向數組插入元素的時候可以直接定位到插入位置keys[size++]
。操作一步到位,不用像哈希表
那樣還需要解決Hash衝突
。 -
對數組的遍歷操作也是如絲般順滑,CPU直接可以在緩存行中遍歷讀取數組元素無需訪問記憶體。比
HashSet
的迭代器java.util.HashMap.KeyIterator
遍歷方式性能不知高到哪裡去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
//採用數組替換到JDK中的HashSet,這樣add操作和遍歷操作效率更高,不需要考慮hash衝突
SelectionKey[] keys;
//數組尾部指針
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
/**
* 數組的添加效率高於 HashSet 因為不需要考慮hash衝突
* */
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
//時間複雜度O(1)
keys[size++] = o;
if (size == keys.length) {
//擴容為原來的兩倍大小
increaseCapacity();
}
return true;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
/**
* 採用數組的遍歷效率 高於 HashSet
* */
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
看到這裡不禁感嘆,從各種小的細節可以看出Netty對性能的優化簡直淋漓盡致,對性能的追求令人髮指。細節真的是魔鬼。
- Netty通過反射的方式用
SelectedSelectionKeySet
替換掉sun.nio.ch.SelectorImpl#selectedKeys
,sun.nio.ch.SelectorImpl#publicSelectedKeys
這兩個集合中原來HashSet
的實現。
- 反射獲取
sun.nio.ch.SelectorImpl
類中selectedKeys
和publicSelectedKeys
。
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Java9
版本以上通過sun.misc.Unsafe
設置欄位值的方式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
- 通過反射的方式用
SelectedSelectionKeySet
替換掉hashSet
實現的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys
。
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//Java8反射替換欄位
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- 將與
sun.nio.ch.SelectorImpl
類中selectedKeys
和publicSelectedKeys
關聯好的Netty優化實現SelectedSelectionKeySet
,設置到io.netty.channel.nio.NioEventLoop#selectedKeys
欄位中保存。
//會通過反射替換selector對象中的selectedKeySet保存就緒的selectKey
//該欄位為持有selector對象selectedKeys的引用,當IO事件就緒時,直接從這裡獲取
private SelectedSelectionKeySet selectedKeys;
後續
Reactor線程
就會直接從io.netty.channel.nio.NioEventLoop#selectedKeys
中獲取IO就緒
的SocketChannel
- 用
SelectorTuple
封裝unwrappedSelector
和wrappedSelector
返回給NioEventLoop
構造函數。到此Reactor
中的Selector
就創建完畢了。
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
-
所謂的
unwrappedSelector
是指被Netty優化過的JDK NIO原生Selector。 -
所謂的
wrappedSelector
就是用SelectedSelectionKeySetSelector
裝飾類將unwrappedSelector
和與sun.nio.ch.SelectorImpl類
關聯好的Netty優化實現SelectedSelectionKeySet
封裝裝飾起來。
wrappedSelector
會將所有對Selector
的操作全部代理給unwrappedSelector
,併在發起輪詢IO事件
的相關操作中,重置SelectedSelectionKeySet
清空上一次的輪詢結果。
final class SelectedSelectionKeySetSelector extends Selector {
//Netty優化後的 SelectedKey就緒集合
private final SelectedSelectionKeySet selectionKeys;
//優化後的JDK NIO 原生Selector
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public SelectorProvider provider() {
return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {
return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {
return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.selectNow();
}
@Override
public int select(long timeout) throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
到這裡Reactor的核心Selector就創建好了,下麵我們來看下用於保存非同步任務的隊列是如何創建出來的。
newTaskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
//通過用SelectedSelectionKeySet裝飾後的unwrappedSelector
this.selector = selectorTuple.selector;
//Netty優化過的JDK NIO遠程Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
我們繼續回到創建Reactor
的主線上,到目前為止Reactor
的核心Selector
就創建好了,前邊我們提到Reactor
除了需要監聽IO就緒事件
以及處理IO就緒事件
外,還需要執行一些非同步任務,當外部線程向Reactor
提交非同步任務後,Reactor
就需要一個隊列來保存這些非同步任務,等待Reactor線程
執行。
下麵我們來看下Reactor
中任務隊列的創建過程:
//任務隊列大小,預設是無界隊列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
-
在
NioEventLoop
的父類SingleThreadEventLoop
中提供了一個靜態變數DEFAULT_MAX_PENDING_TASKS
用來指定Reactor
任務隊列的大小。