上篇文章中我們梳理了ChannelPipeline中入站事件的傳播,這篇文章中我們看下出站事件的傳播,也就是ChannelOutboundHandler介面的實現。 1、出站事件的傳播示例 我們對上篇文章中的示例代碼進行改造,在ChannelPipeline中加入ChannelOutboundHan ...
上篇文章中我們梳理了ChannelPipeline中入站事件的傳播,這篇文章中我們看下出站事件的傳播,也就是ChannelOutboundHandler介面的實現。
1、出站事件的傳播示例
我們對上篇文章中的示例代碼進行改造,在ChannelPipeline中加入ChannelOutboundHandler出站實現
public class ServerApp { public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup(2); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // p.addLast(new LoggingHandler(LogLevel.INFO)); // 向ChannelPipeline中添加自定義channelHandler p.addLast(new OutHandlerA()); p.addLast(new ServerHandlerA()); p.addLast(new ServerHandlerB()); p.addLast(new ServerHandlerC()); p.addLast(new OutHandlerB()); p.addLast(new OutHandlerC()); } }); bootstrap.bind(8050).sync(); } catch (Exception e) { // TODO: handle exception } } } public class OutHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.err.println(this.getClass().getName()+msg); ctx.writeAndFlush((ByteBuf)msg); } } public class OutHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) { System.out.println(this.getClass().getName()+msg); ctx.write((ByteBuf)msg); } } public class OutHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) { System.out.println(this.getClass().getName()+"--"+msg); ctx.write((ByteBuf)msg); } }
然後我們在ServerHandlerA的channelRead方法中執行ctx的write方法,模擬消息出站事件的發生。
public class ServerHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer(); byteBuf.writeByte(1); byteBuf.writeByte(2); ctx.channel().write(byteBuf); //ctx.write(byteBuf); } }
上面channelRead方法中write方法的調用有兩種方式 ctx.channel().write 與 ctx.write,這兩種方式有何區別呢,我們首先看下這兩種方式的運行結果
ctx.channel().write
io.netty.example.echo.my.OutHandlerC--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256) io.netty.example.echo.my.OutHandlerB--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256) io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)
ctx.write
io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)
可以看到當調用ctx.channel().write時,消息在管道中傳播的順序是從尾部一直傳遞到最上層的OutboundHandler;而 ctx.write會從所在的 handler 向前找 OutboundHandler。
那麼這兩種方式區別是否就如結果所示呢,下麵我們就開始對這兩種方法的內部實現進行分析
2、出站事件傳播的分析
ctx.channel().write與 ctx.write 分別用的是AbstractChannel與AbstractChannelHandlerContext的write方法
AbstractChannel 的 write方法
@Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }
AbstractChannelHandlerContext 的 write方法
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
上面代碼中AbstractChannel的 wirte方法最終調用的是pipeline的write方法,我們進入pipeline內部查看,可以看到pipeline的write方法預設從尾部AbstractChannelHandlerContext節點開始調用。
@Override public final ChannelFuture write(Object msg) { return tail.write(msg); }
繼續向下跟蹤最終它們調用的都是AbstractChannelHandlerContext 的 write方法,下麵我們看下方法內部的具體實現。
private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) {//檢查ChannelPromise是否有效 ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } //尋找上一個AbstractChannelHandlerContext節點 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) {//與當前線程是否一致 if (flush) {//確定是否要把數據沖刷到遠程節點 next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { //如果不一致的封裝成writeTask任務線程 final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } //把該線程任務交給對應的EventExecutor執行 if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
主要關註下findContextOutbound(),這個方法的作用就是獲取當前AbstractChannelHandlerContext節點的上一個節點prev
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev;//獲取當前節點的上一個節點 } while (!ctx.outbound);//判斷是不是出站節點 return ctx; }
最終通過next.invokeWrite(m, promise)回調方法,調用下一個節點中封裝的ChannelOutboundHandler的write方法,從而實現write方法事件的傳遞
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) {//判斷當前ChannelOutboundHandler是否已經被添加到pipeline中(handlerAdded事件觸發) invokeWrite0(msg, promise); } else { write(msg, promise); } } private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
到這裡整個出站事件的傳播流程已經基本清晰了,wirte方法本身就是一個尋找並回調下一個節點中wirte方法的過程。
3、write與writeAndFlush
在上面代碼中可以看到這兩個方法主要在於是否會在執行write方法後,是否會執行flush方法。
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //是否調用回調方法 //調用write與flush回調方法,最終調用自定義hander的對應實現 invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
這裡需要註意的是invokeFlush0()在invokeWrite0後執行,也就是必須等到消息出站事件傳遞完畢後,才會調用flush把數據沖刷到遠程節點。簡單理解就是你無論是在OutHandlerA、OutHandlerB還是OutHandlerC中調用writeAndFlush,最後都是要在write事件傳遞完畢才會flush數據的。
同時我們需要註意到當write與flush事件從OutHandlerA再往上傳遞時,OutHandlerA的的上一個節點就是Pipeline的頭節點HeadContext,我們看下HeadContext的write與flush方法實現;
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }
到這裡我們可以看出,消息的真正入隊與發送最終是通過HeadContext的write與flush方法實現。
通過以上的分析我們可以看到Pipeline出站事件的傳播流程,同時我們需要註意ctx.write與ctx.channel().write的區別以及消息的發送最終是通頭部節點調用unsafe的write與flush方法實現的,其中如有不足與不正確的地方還望指出與海涵。
關註微信公眾號,查看更多技術文章。