netty服務端啟動 ServerBootstrap源碼解析 前面的第一篇文章中,我以spark中的netty客戶端的創建為切入點,分析了netty的客戶端引導類Bootstrap的參數設置以及啟動過程。顯然,我們還有另一個重要的部分 服務端的初始化和啟動過程沒有探究,所以這一節,我們就來從源碼層面 ...
netty服務端啟動--ServerBootstrap源碼解析
前面的第一篇文章中,我以spark中的netty客戶端的創建為切入點,分析了netty的客戶端引導類Bootstrap的參數設置以及啟動過程。顯然,我們還有另一個重要的部分--服務端的初始化和啟動過程沒有探究,所以這一節,我們就來從源碼層面詳細分析一下netty的服務端引導類ServerBootstrap的啟動過程。
spark中netty服務端的創建
我們仍然以spark中對netty的使用為例,以此為源碼分析的切入點,首先我們看一下spark的NettyRpc模塊中創建netty服務端引導類的代碼:
TransportServer.init
TransportServer的構造方法中會調用init方法,ServerBootstrap類就是在init方法中被創建並初始化以及啟動的。
這個方法主要分為三塊:
- 創建ServerBootstrap對象,並設置各種參數。我們看到,這裡的bossGroup和workerGroup是同一個線程組,此外還設置了socket的一些參數如排隊的連接數,接收緩衝區,發送緩衝區大小等。
- 設置childHandler參數,之所以把這個參數的設置單獨拿出來就是為了凸顯這個參數的重要性,childHandler參數是用戶實現時間處理邏輯的地方
- 最後將服務端綁定到某個埠,同時在綁定的過程中也會啟動服務端,開始監聽io事件。
很顯然,ServerBootstrap的啟動入口就是bind方法。
// 初始化netty服務端
private void init(String hostToBind, int portToBind) {
// io模式,有兩種選項NIO, EPOLL
IOMode ioMode = IOMode.valueOf(conf.ioMode());
// 創建bossGroup和workerGroup,即主線程組合子線程組
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
// 緩衝分配器,分為堆記憶體和直接記憶體
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
// 創建一個netty服務端引導對象,並設置相關參數
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
// 記憶體使用的度量對象
this.metrics = new NettyMemoryMetrics(
allocator, conf.getModuleName() + "-server", conf);
// 排隊的連接數
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
// socket接收緩衝區大小
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
// socket發送緩衝區大小
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
// 子channel處理器
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
// 綁定到ip地址和埠
channelFuture = bootstrap.bind(address);
// 同步等待綁定成功
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
AbstractBootstrap.init(SocketAddress localAddress)
這裡的校驗主要是對group和channelFactory的非空校驗
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
AbstractBootstrap.doBind
這個方法,我們之前在分析Bootstrap的啟動過程時提到過,它的主要作用如下:
- 通過反射根據傳入的channel類型創建一個具體的channel對象
- 調用init方法對這個channel對象進行初始化
- 將初始化完成的channel對象註冊到一個EventLoop線程上
之前,我們分析了NioSocketChannel的構造過程,以及Bootstarp中對channel的初始化過程,
本節我們要分析NioServerSocketChannel的構造過程,以及ServerBootstrap的init方法的實現。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 創建一個channel,並對這個channel做一些初始化工作
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 將這個channel綁定到指定的地址
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {// 對於尚未註冊成功的情況,採用非同步的方式,即添加一個回調
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
NioServerSocketChannel的構造方法
這裡通過調用jdk的api創建了一個ServerSocketChannel。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
與NioSocketChannelConfig類似,NioServerSocketChannelConfig也是一種門面模式,是對NioServerSocketChannel中的參數介面的封裝。
此外,我們註意到,這裡規定了NioServerSocketChannel的初始的感興趣的事件是ACCEPT事件,即預設會監聽請求建立連接的事件。
而在NioSocketChannel中的初始感興趣的事件是read事件。
所以,這裡與NioSocketChannel構造過程最主要的不同就是初始的感興趣事件不同。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
這裡首先調用了父類的構造方法,最終調用了AbstractNioChannel類的構造方法,這個過程我們在之前分析NioSocketChannel初始化的時候已經詳細說過,主要就是創建了內部的Unsafe對象和ChannelPipeline對象。
ServerBootstrap.init
分析完了channel的構造過程,我們再來看一下ServerBootstrap是怎麼對channel對象進行初始化的。
- 設置參數,設置屬性
- 獲取子channel的參數和屬性,以便在有新的連接時給新創建的channel設置參數和屬性
- 給serverChannel中添加一個重要的handler,這個handler中實現了對新創建的channel的處理邏輯。
所以,很顯然,我們接下來就要看一下這個特殊的handler,ServerBootstrapAcceptor的read方法。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
// 設置參數
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 設置屬性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
// 子channel的group和handler參數
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 添加處理器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 一般情況下,對於ServerBootstrap用戶無需設置handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 這裡添加了一個關鍵的handler,並且順手啟動了對應的EventLoop的線程
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch)
在分析ServerBootstrapAcceptor之前,我們首先來回顧一下NioEventLoop的迴圈中,對於accept事件的處理邏輯,這裡截取其中的一小段代碼:
// 處理read和accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
可見,對於accept事件和read事件一樣,調用NioUnsafe的read方法
AbstractNioMessageChannel.NioMessageUnsafe.read
因為NioServerSocketChannel繼承了AbstractNioMessageChannel,並且read方法的實現也是在AbstractNioMessageChannel中,
- doReadMessages是一個抽象方法,在NioServerSocketChannel的實現中,這個方法調用jdk的api接收一個連接,並包裝成NioSocketChannel對象
- 以讀取到的channel對象作為消息,在channelPipeline中觸發一個讀事件
根據前面對channelPipeline的分析,我們知道,讀事件對從頭結點開始,向尾節點傳播。上面我們也提到了,對於初始的那個NioServerSocketChannel,會在ServerBootstarp的init方法中向這個channel的處理鏈中加入一個ServerBootstrapAcceptor處理器,所以,很顯然,接下來我們應該分析ServerBootstrapAcceptor中對讀事件的處理。
public void read() {
// 確認當前代碼的執行是在EventLoop的線程中
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 這裡讀取到的是建立的連接對應的channel,
// jdk的socketChannel被包裝成了netty的NioSocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 把接收到的每一個channel作為消息,在channelPipeline中觸發一個讀事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 最後觸發一個讀完成的事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
ServerBootstrapAcceptor.channelRead
代碼邏輯還是比較簡單的,因為有了前面的鋪墊,即在ServerBootstrap的init方法對創始的那個serverChannel進行初始化時,將用戶設置的子channel的參數,屬性,子channel的handler和子group等參數作為構造參數全部傳給了ServerBootstrapAcceptor,所以在這裡直接用就行了。
其實這裡的子channel的初始化和註冊過程和Bootstrap中對一個新創建的channel的初始化過程基本一樣,區別在於Bootstrap中channel是用戶代碼通過調用connect方法最終在initAndregistry中通過反射構造的一個對象;而在服務端,通過監聽ServerSocketChannel的accept事件,當有新的連接建立請求時,會自動創建一個SocketChannel(jdk的代碼實現),然後NioServerSocketChannel將其包裝成一個NioSocketChannel,並作為消息在傳遞給處理器,所以在ServerSocketChannel中的子channel的創建是由底層的jdk的庫實現的。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 類型轉換,這裡的強制轉換是安全的的,
// 是由各種具體的AbstractNioMessageChannel子類型的實現保證的
// 各種具體的AbstractNioMessageChannel子類型的讀方法確保它們讀取並最終返回的是一個Channel類型
final Channel child = (Channel) msg;
// 給子channel添加handler
child.pipeline().addLast(childHandler);
// 給子channel設置參數
setChannelOptions(child, childOptions, logger);
// 給子channel設置屬性
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 將子channel註冊到子group中
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
AbstractBootstrap.doBind0
回到doBind方法中,在完成了channel的構造,初始化和註冊邏輯後,接下來就要把這個server類型的channel綁定到一個地址上,這樣才能接受客戶端建立連接的請求。
從代碼中可以看出,調用了channel的bind方法實現綁定的邏輯。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 調用了channel.bind方法完成綁定的邏輯
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
AbstractChannel.bind
bind操作的傳遞是從尾節點開始向前傳遞,所以我們直接看Headcontext對於bind方法的實現
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
DefaultChannelPipeline.bind
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
HeadContext.bind
調用了unsafe的bind方法。
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
因為後面右有幾個事件的觸發,每個觸發事件都是通過channel的相關方法來觸發,然後又是通過channelpipeline的傳遞事件,這些事件最後基本都是由HeadContext處理了,所以這裡我只簡單地敘述一下後面的 大概邏輯,代碼比較繁瑣,而且很多都是相同的調用過程,所以就不貼代碼了。
- 通過前面的分析,我們知道首先通過channel觸發了一個bind操作,這個操作的實現最終由HeadCOntex實現,HeadContex的實現中是調用了unsafe.bind
- bind的實現邏輯中,首先通過jdk的api完成了ServerSocketChannel的綁定,然後又觸發了一個channelActive的事件,這個事件的處理最終也是有HeadContext實現
- 在HeadContext對channelActive操作的實現中,觸發了一個read()操作,註意這裡的這個read方法是不帶參數的,是ChannelOutboundInvoker介面中定義的一個方法,也是有HeadContext實現
- HeadContext對read操作的實現中,調用了Unsafe.beginRead方法,經過幾個子類的具體實現後,最終由AbstractNioChannel.doBeginRead實現具體的開始讀的邏輯,
從代碼中可以看出來,最終調用了jdk的api,將感興趣的事件添加到selectionKey中。通過前面的 分析,我們知道對於NioSocketChannel,它的感興趣的讀事件類型是SelectionKey.OP_READ,也就是讀事件;
而對於NioServerSocketChannel,根據前面對其構造方法的分析,它的感興趣的事件是SelectionKey.OP_ACCEPT,也就是建立連接的事件。
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 將讀事件類型加入到selectionKey的感興趣的事件中
// 這樣jdk底層的selector就會監聽相應類型的事件
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
總結
到這裡,我們就把ServerBootstrap的主要功能代碼分析完了,這裡面主要包括三個方面:
- ServerBootstrap中對server類型的channel的初始化,包括最重要的handler----ServerBootstrapAcceptor的添加
- ServerBootstrapAcceptor中對於新創建的子channel的處理,包括初始化和註冊的邏輯
- 將serverChannel綁定到具體的地址上,綁定過程中也啟動了對應的註冊的線程。