EventLoop 在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要瞭解netty的線程模型。NioEventLoopGroup可以理解為一組線程,這些線程每一個都可以獨立地處理多個channel產生的io事件。 N ...
EventLoop
在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要瞭解netty的線程模型。NioEventLoopGroup可以理解為一組線程,這些線程每一個都可以獨立地處理多個channel產生的io事件。
NioEventLoopGroup初始化
我們看其中一個參數比較多的構造方法,其他一些參數較少的構造方法使用了一些預設值,使用的預設參數如下:
- SelectorProvider類型,用於創建socket通道,udp通道,創建Selector對象等,預設值是SelectorProvider.provider(),大部分情況下使用預設值就行,這個方法最終創建的是一個WindowsSelectorProvider對象
- SelectStrategyFactory,Select策略類的工廠類,它的預設值是DefaultSelectStrategyFactory.INSTANCE,就是一個SelectStrategyFactory對象本身,而SelectStrategyFactory工廠產生的是DefaultSelectStrategy策略類。
- RejectedExecutionHandler,拒絕任務的策略類,決定在任務隊列已滿時採取什麼樣的策略,類似於jdk線程池的RejectedExecutionHandler的作用
接下來,我們看一下其中的一個常用的構造方法,
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
可見,當前類中並沒有什麼初始化邏輯,直接調用了父類的構造方法,所以我們接著看父類MultithreadEventLoopGroup的構造方法:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
同樣,並未做任務處理,直接調用父類構造方法,所以我們接著看MultithreadEventExecutorGroup構造方法,初始化邏輯的實現在這個類中,
MultithreadEventExecutorGroup構造方法
通過上一小結的分析,我們知道NioEventLoopGroup的構造方法的主要邏輯的實現是在MultithreadEventExecutorGroup類中,並且在調用構造方法的過程中加上了一個參數的預設值,即EventExecutorChooserFactory類型參數的預設值DefaultEventExecutorChooserFactory.INSTANCE,這個類以輪詢(roundrobin)的方式從多個線程中依次選出線程用於註冊channel。
總結一下這段代碼的主要步驟:
- 首先是一些變數的非空檢查和合法性檢查
- 然後根據傳入的線程數量,創建若幹個子執行器,每個執行器對應一個線程
- 最後以子執行器數組為參數,使用選擇器工廠類創建一個選擇器
最後給每個子執行器添加一個監聽器,以監聽子執行器的終止,做一些簿記工作,使得在所有子執行器全部終止後將當前的執行器組終止
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 首先是變數的非空檢查以及合法性判斷,
// nThreads在MultithreadEventLoopGroup的構造方法中已經經過一些預設值處理,
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}// 這裡一般都會使用預設值, // ThreadPerTaskExecutor的作用即字面意思,一個任務一個線程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 子執行器的數組,一個子執行器對應一個線程 children = new EventExecutor[nThreads]; // 根據傳入的線程數量創建多個自執行器 // 註意,這裡子執行器創建好後並不會立即運行起來 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 如果創建子執行器不成功,那麼需要將已經創建好的子執行器也全部銷毀 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 等待所以子執行器停止後在退出 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 創建一個子執行器的選擇器,選擇器的作用是從子執行器中選出一個 // 預設使用roundRobin的方式 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 給每個子執行器添加監聽器,在子執行器終止的時候做一些工作 // 每有一個子執行器終止時就將terminatedChildren變數加一 // 當所有子執行器全部終止時,當前這個執行器組就終止了 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 包裝一個不可變的集合 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
NioEventLoopGroup.newChild
上面的方法中調用了newChild方法來創建一個子執行器,而這個方法是一個抽象方法,我們看NioEventLoopGroup類的實現:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可見僅僅是簡單地創建了一個NioEventLoop對象。
小結
到這裡,我們就把NioEventLoopGroup的初始化過程分析完了。我們不禁思考,既然NioEventLoopGroup是一個執行器組,說白了就是一組線程,那這些線程是什麼時候跑起來的呢?如果讀者還有印象,應該能記得我們在分析Bootstrap建立連接過程時,channel初始化之後需要註冊到EventLoopGroup中,其實是註冊到其中的一個EventLoop上,註冊邏輯最終是在AbstractChannel.AbstractUnsafe.register方法中實現的,其中有一段代碼:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
首先調用eventLoop.inEventLoop()判斷執行器的線程與當前線程是否是同一個,如果是則直接執行註冊的代碼,如果不是就調用eventLoop.execute將註冊邏輯封裝成一個任務放到執行器的任務隊列中,接下里我們就以這個方法為切入點,探究一下子執行器線程的啟動過程。
AbstractEventExecutor.inEventLoop
首先,讓我們來看一下這個方法,這個方法的作用是判斷當前線程與執行器的線程是否同一個線程。
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
SingleThreadEventExecutor.inEventLoop
代碼很簡單,就不多說了。
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
SingleThreadEventExecutor.execute
方法很簡單,核心邏輯在startThread方法中,
public void execute(Runnable task) {
// 非空檢查
if (task == null) {
throw new NullPointerException("task");
}
// 執行到這裡一般都是外部調用者,
boolean inEventLoop = inEventLoop();
// 向任務隊列中添加一個任務
addTask(task);
// 如果當前線程不是執行器的線程,那麼需要檢查執行器線程是否已經運行,
// 如果還沒在運行,就需要啟動線程
if (!inEventLoop) {
startThread();
// 檢查線程是否被關閉
if (isShutdown()) {
boolean reject = false;
try {
// 將剛剛添加的任務移除
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
// addTaskWakesUp不知道這個變數意義是什麼,NioEventLoop傳進來的是false
// 向任務隊列中添加一個空任務,這樣就能夠喚醒阻塞的執行器線程
// 有些情況下執行器線程會阻塞在taskQueue上,
// 所以向阻塞隊列中添加一個元素能夠喚醒哪些因為隊列空而被阻塞的線程
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
SingleThreadEventExecutor.startThread
這個方法的主要作用是維護內部的狀態量state,使用cas指令併發情況下對狀態量的修改是線程安全的,並且對於狀態量的判斷保證啟動邏輯只被執行一次
private void startThread() {
// 狀態量的維護
if (state == ST_NOT_STARTED) {
// 這裡使用了jdk中的原子更新器AtomicIntegerFieldUpdater類,
// 使用cpu的cas指令保證併發情況下能夠安全地維護狀態量
// 保證只有一個線程能夠執行啟動邏輯,保證啟動邏輯只被執行一次
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 實際啟動線程的邏輯
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
SingleThreadEventExecutor.doStartThread
這個方法我就不貼代碼了,說一下它的主要作用:
- 使用內部的Executor對象(一般是一個ThreadPerTaskExecutor)啟動一個線程,並執行任務
- 維護執行器的運行狀態,主要是通過內部的狀態量和cas指令來保證線程安全;此外維護內部的一些簿記量,例如線程本身的引用,線程啟動時間等
- 線程結束時做一些收尾和清理工作,例如將剩餘的任務跑完,運行關閉鉤子,關閉底層的selector(這個是具體的子類的清理邏輯),同時更新狀態量
具體的業務邏輯仍然是在子類中實現的,也就是SingleThreadEventExecutor.run()方法的具體實現。
NioEventLoop.run
我們仍然以NioEventLoop為例,看一下它實現的run方法。還大概講一下它的主要邏輯:
- 首選這個方法是一個迴圈,不斷地通過調用jdk底層的selector接收io事件,並對不同的io事件做處理,同時也會處理任務隊列中的任務,以及定時調度或延遲調度的任務
- 調用jdk的api, selector接收io事件
- 處理各種類型的io事件
- 處理任務
這裡,我就不貼代碼了,其中比較重要的是對一些併發情況的考慮和處理,如selector的喚醒時機。接下來,主要看一下對於各種io事件的處理,至於任務隊列以及調度隊列中任務的處理比較簡單,就不展開了。
NioEventLoop.processSelectedKeysOptimized
這個方法會遍歷所有接受到的io事件對應的selectionKey,然後依次處理。
private void processSelectedKeysOptimized() {
// 遍歷所有的io事件的SelectionKey
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 處理事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 如果需要重新select,那麼把後面的selectionKey全部置0,然後再次調用selectNow方法
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
NioEventLoop.processSelectedKey
這個方法首先對SelectionKey無效的情況做了處理,分為兩種情況:channel本身無效了;channel仍然是正常的,只不過是被從當前的selector上註銷了,可能在其他的selector中仍然是正常運行的
- 對於第一種情況,需要關閉channel,即關閉底層的連接
- 對於第二種情況則不需要做任何處理。
接下來,我們著重分析一下對於四種事件的處理邏輯。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 如果selectionKey是無效的,那麼說明相應的channel是無效的,此時需要關閉這個channel
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
// 只關閉註冊在當前EventLoop上的channel,
// 理論上來說,一個channel是可以註冊到多個Eventloop上的,
// SelectionKey無效可能是因為channel從當前EventLoop上註銷了,
// 但是channel本身依然是正常的,並且註冊在其他的EventLoop中
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
// 到這裡說明channel已經無效了,關閉它
unsafe.close(unsafe.voidPromise());
return;
}
// 下麵處理正常情況
try {
// 準備好的io事件
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 處理connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 處理write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 處理read和accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
connect事件處理
從代碼中可以看出,connect事件的處理時通過調用NioUnsafe.finishConnect完成的,我們看一下AbstractNioUnsafe.finishConnect的實現:
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
// 是否已經處於連接成功的狀態
boolean wasActive = isActive();
// 抽象方法,有子類實現
doFinishConnect();
// 處理future對象,將其標記為成功
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
可以看出,主要是通過調用doFinishConnect實現完成連接的邏輯,具體到子類中,NioSocketChannel.doFinishConnect的實現是:
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
write事件處理
對於的write事件的處理時通過調用NioUnsafe.forceFlush方法完成,最終的實現在AbstractChannel.AbstractUnsafe.flush0中:
大體上看,這個方法的邏輯比較簡單,但是實際上最複雜也是最核心的寫入邏輯在子類實現的doWrite方法中。由於本篇的重點在於把NioEventLoop的主幹邏輯梳理一下,所以這裡不再繼續展開,後面會單獨來分析這一塊的源碼,這裡涉及到netty中對緩衝區的封裝,其中涉及到一些比較複雜的邏輯。
protected void flush0() {
// 如果正在寫數據,直接返回
if (inFlush0) {
// Avoid re-entrance
return;
}
// 輸出的緩衝區
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 將緩衝區的數據寫入到channel中
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t), false);
}
}
} finally {
inFlush0 = false;
}
}
read事件和accept事件處理
乍看會比較奇怪,為什麼這兩個事件要放到一起處理呢,他們明明是不同的事件。這裡主要還是考慮到編碼的統一,因為read事件只有NioSocketChannel才會有,而accept事件只有NioServerSocketChannel才會有,所以這裡通過抽象方法,讓不同的子類去實現各自的邏輯,是的代碼結構上更統一。我們這裡看一下NioScketChannel的實現,而對於NioServerSocketChannel的實現我會在後續分析netty服務端的啟動過程時在具體講到,即ServerBootstrap的啟動過程。
NioByteUnsafe.read
總結一下這個方法的主要邏輯:
- 首先會獲取緩衝分配器和相應的處理器RecvByteBufAllocator.Handle對象
- 迴圈讀取數據,每次分配一個一定大小(大小可配置)的緩衝,將channel中待讀取的數據讀取到緩衝中
- 以裝載有數據的緩衝為消息體,向channel的處理流水線(即pipeline)中觸發一個讀取的事件,讓讀取到的數據在流水線中傳播,被各個處理器處理
- 重覆此過程,知道channel中沒有可供讀取的數據
- 最後向pipeline中觸發一個讀取完成的事件
最後還要根據最後一次讀取到的數據量決定是否關閉通道,如果最後一次讀取到的數據量小於0,說明對端已經關閉了輸出,所以這裡需要將輸入關閉,即通道處於半關閉狀態。
public final void read() { final ChannelConfig config = config(); // 如果通道已經關閉,那麼就不需要再讀取數據,直接返回 if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 緩衝分配器 final ByteBufAllocator allocator = config.getAllocator(); // 緩衝分配的處理器,處理緩衝分配,讀取計數等 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 分配一個緩衝 byteBuf = allocHandle.allocate(allocator); // 將通道的數據讀取到緩衝中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 如果沒有讀取到數據,說明通道中沒有待讀取的數據了, if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. // 因為沒讀取到數據,所以應該釋放緩衝 byteBuf.release(); byteBuf = null; // 如果讀取到的數據量是負數,說明通道已經關閉了 close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } // 更新Handle內部的簿記量 allocHandle.incMessagesRead(1); readPending = false; // 向channel的處理器流水線中觸發一個事件, // 讓取到的數據能夠被流水線上的各個ChannelHandler處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; // 這裡根據如下條件判斷是否繼續讀: // 上一次讀取到的數據量大於0,並且讀取到的數據量等於分配的緩衝的最大容量, // 此時說明通道中還有待讀取的數據 } while (allocHandle.continueReading()); // 讀取完成 allocHandle.readComplete(); // 觸發一個讀取完成的事件 pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 // 這裡isAutoRead預設是true, 所以正常情況下會繼續監聽read事件 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
總結
本篇主要分析了EventLoop的事件監聽以及處理邏輯,此外處理處理io事件,也會處理添加進來的任務和定時調度任務和延遲調度任務。EventLoop就像是整個框架的發動機或者說是心臟,它通過jdk api進而簡介地調用系統調用,不斷地監聽各種io事件,同時對不同的io事件分門別類採用不同的處理方式,對於read事件則會將網路io數據讀取到緩衝中,並將讀取到的數據傳遞給用戶的處理器進行鏈式處理。Channelpipeline就像一個流水線一樣,對觸發的的各種事件進行處理。
遺留問題
- NioSocketChannel.doWrite方法的寫入邏輯的,待進一步分析
- ChannelPipeline的詳細分析,各種事件是怎麼在處理器之間傳播的,設計模式,代碼結構等
- 緩衝分配器和緩衝處理器的分析,它們是怎麼對記憶體進行管理的,這也是netty高性能的原因之一。