Bootstrap初始化過程 netty的客戶端引導類是Bootstrap,我們看一下spark的rpc中客戶端部分對Bootstrap的初始化過程 TransportClientFactory.createClient(InetSocketAddress address) 只需要貼出Bootstr ...
Bootstrap初始化過程
netty的客戶端引導類是Bootstrap,我們看一下spark的rpc中客戶端部分對Bootstrap的初始化過程
TransportClientFactory.createClient(InetSocketAddress address)
只需要貼出Bootstrap初始化部分的代碼
// 客戶端引導對象
Bootstrap bootstrap = new Bootstrap();
// 設置各種參數
bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
// 關閉Nagle演算法
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
// socket接收緩衝區
if (conf.receiveBuf() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
// socket發送緩衝區
// 對於接收和發送緩衝區的設置應該用如下的公式計算:
// 延遲 *帶寬
// 例如延遲是1ms,帶寬是10Gbps,那麼緩衝區大小應該設為1.25MB
if (conf.sendBuf() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();
// 設置handler(處理器對象)
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
// Connect to the remote server
long preConnect = System.nanoTime();
// 與服務端建立連接,啟動方法
ChannelFuture cf = bootstrap.connect(address);
分為幾個主要的步驟:
- 首先創建一個Bootstrap對象,調用的是無參構造器
- 設置各種參數,如通道類型,關閉Nagle演算法,接收和發送緩衝區大小,設置處理器
- 調用connect與服務端建立連接
接下來,我們主要通過兩條線索來分析Bootstrap的啟動過程,即構造器和connect兩個方法,而對於設置參數的過程僅僅是給內部的一些成員變數賦值,所以不需要詳細展開。
Bootstrap.Bootstrap()
Bootstrap繼承了AbstractBootstrap,看了一下他們的無參構造方法,都是個空方法。。。。。。所以這一步,我們就省了,瞬間感覺飛起來了有沒有^_^
Bootstrap.connect(SocketAddress remoteAddress)
public ChannelFuture connect(SocketAddress remoteAddress) {
// 檢查非空
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
// 同樣是對一些成員變數檢查非空,主要檢查EventLoopGroup,ChannelFactory,handler對象
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
主要是做了一些非空檢查,需要註意的是,ChannelFactory對象的設置,前面的spark中在對Bootstrap初始化設置的時候調用了.channel(socketChannelClass)方法,這個方法如下:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
創建了一個ReflectiveChannelFactory對象,並賦值給內部的channelFactory成員。這個工廠類會根據傳進來的Class對象通過反射創建一個Channel實例。
doResolveAndConnect
從這個方法的邏輯中可以看出來,創建一個連接的過程分為兩個主要的步驟;
- 初始化一個Channel對象並註冊到EventLoop中
- 調用doResolveAndConnect0方法完成tcp連接的建立
值得註意的是,initAndRegister方法返回一個Future對象,這個類型通常用於非同步機制的實現。在這裡,如果註冊沒有立即成功的話,會給返回的futrue對象添加一個監聽器,在註冊成功以後建立tcp連接。
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 初始化一個Channel對象並註冊到EventLoop中
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
// 如果註冊失敗,世界返回失敗的future對象
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {// 如果註冊還在進行中,需要向future對象添加一個監聽器,以便在註冊成功的時候做一些工作,監聽器實際上就是一個回調對象
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 註冊成功後仍然調用doResolveAndConnect0方法完成連接建立的過程
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
initAndRegister
仍然分為兩個步驟:
- 通過channel工廠類創建一個channel對象,通過反射獲取指定的channel類型的無參構造器,調用構造器來創建對象
- 調用init方法對channel對象進行初始化,init方法是一個抽象方法,Bootstrap和ServerBootstrap的實現不同
- 將channel註冊到EventLoopGroup中
註意看源碼中的一段註釋,這段註釋對netty的線程模型的理解很有幫助,大致意思是說:
- 如果當前的代碼是在EventLoopEvent線程中執行的,那麼代碼運行到這裡說明channel已經成功註冊到EventLoopEvent上了,此時再調用bind() 或 connect()方法肯定是沒有問題的
- 如果當前代碼不是在EventLoopEvent線程中執行的,也就是說當前線程是另外的線程,在這裡繼續調用bind() 或 connect()方法仍然是安全的,並不會由於併發引起方法執行順序的錯亂,原因是netty中一個channel只會綁定到一個線程上,所有關於這個channel的操作包括註冊,bind或connect都會以排隊任務的形式在一個線程中串列執行,這種做法也為netty規避了很多線程安全問題,從而減少了很多加鎖,同步的代碼,減少了線程之間的競爭資源導致的線程切換,側面上提高了線程執行效率。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通過channel工廠類創建一個channel對象
channel = channelFactory.newChannel();
// 調用init方法對channel進行一些初始化的設置
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 註冊到EventLoopGroup中
ChannelFuture regFuture = config().group().register(channel);
// 如果發生異常,需要關閉已經建立的連接
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
NioSocketChannel初始化
DEFAULT_SELECTOR_PROVIDER是預設的SelectorProvider對象,這時jdk中定義的一個類,主要作用是生成選擇器selector對象和通道channel對象
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
newSocket中通過調用provider.openSocketChannel()方法創建了一個SocketChannel對象,它的預設實現是SocketChannelImpl。
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
然後經過幾次調用,最後調用了下麵的構造器,首先調用了父類AbstractNioByteChannel的構造器,
然後創建了一個SocketChannelConfig對象,這個類有點類似於門面模式,對NioSocketChannel對象和Socket對象的一些參數設置和獲取的介面進行封裝。
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
我們在接著看父類AbstractNioByteChannel的構造方法
AbstractNioByteChannel(Channel parent, SelectableChannel ch)
沒有做任何工作,直接調用了父類的構造方法,註意這裡多了一個參數SelectionKey.OP_READ,這個參數表示channel初始時的感興趣的事件,channel剛創建好之後對read事件感興趣
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp)
主要還是調用父類的構造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 父類構造方法
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 設置非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
// 如果發生異常,關閉該channel
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
AbstractChannel(Channel parent)
最關鍵的初始化邏輯在這個最頂層的基類中,其中很重的兩個對象Unsafe對象和ChannelPipeline對象,前者封裝了jdk底層api的調用,後者是實現netty對事件的鏈式處理的核心類。
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 創建一個ChannelId對象,唯一標識該channel
id = newId();
// Unsafe對象,封裝了jdk底層的api調用
unsafe = newUnsafe();
// 創建一個DefaultChannelPipeline對象
pipeline = newChannelPipeline();
}
小結
前面一小節,我們主要簡單分析了一下NioSocketChannel的初始化過程,可以看到最主要的邏輯在AbstractChannel的構造方法中,這裡我們看到了兩個重要的類的創建過程。
Bootstrap.init
回到AbstractBootstrap.initAndRegister方法中,在完成通過反射調用NioSocketChannel構造方法並創建一個實例後,緊接著就要對這個新創建的Channel實例進行初始化設置工作,我們看一下Bootstrap對新創建的Channel的初始化過程:
- 向channel的Pipeline中添加一個處理器,ChannelPipeline我們可以理解為一個流水線,在這條流水線上有各種各樣的處理器,一個channel事件產生後會在這個流水線上進行傳播,依次經過所有的處理器
- 設置參數,也就是以ChannelOption為key的一些參數,可以通過DefaultChannelConfig.setOption方法看到具體可以設置哪些參數。
設置屬性
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// 向ChannelPipeline中添加一個處理器,這個處理器就是我們之前設置的處理器
p.addLast(config.handler());final Map<ChannelOption<?>, Object> options = options0(); // 設置參數,最終是通過調用SocketChannelConfig的一些參數設置介面設置參數 synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); // 設置屬性 synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } }
}
MultithreadEventLoopGroup.register
在完成channel的創建和初始化之後,我們就要將這個channel註冊到一個EventLoop中,NioNioEventLoop繼承自MultithreadEventLoopGroup, 通過調用SingleThreadEventLoop的register方法完成註冊
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
可以看到,通過next()方法選出了其中的一個EventLoop進行註冊。MultithreadEventLoopGroup是對多個真正的EventLoopGroup的封裝,每個實現了實際功能的真正的EventLoopGroup運行在一個線程內,
所以我們接下來應該看單個的EventLoopGroup的註冊方法。
SingleThreadEventLoop.register
這裡創建了一個DefaultChannelPromise對象,用於作為返回值。
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
最終調用了Unsafe的register方法將channel綁定到當前的EventLoopGroup對象上。
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel.AbstractUnsafe.register
- 首先是做一些前置檢查,包括變數非空檢查,重覆註冊檢查,檢查channel類型和EventLoopGroup類型是否匹配
- 將這個channel綁定到指定的eventLoop對象上,
調用register0完成註冊
public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 做一些非空檢查 if (eventLoop == null) { throw new NullPointerException("eventLoop"); } // 如果重覆註冊,通過future對象拋出一個異常 // 一個channel只能註冊到一個EventLoopGroup對象上 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } // 檢查channel類型和EventLoopGroup類型是否匹配 if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 將channel內部的eventLoop成員設置為相應的對象 // 也就是將這個channel綁定到指定頂eventLoop上 AbstractChannel.this.eventLoop = eventLoop; // 這裡做了一個判斷,如果當前處於eventLoop對應的線程內,那麼直接執行代碼 // 如果當前運行的線程與eventLoop不是同一個,那麼將這個註冊的任務添加到eventLoop的任務隊列中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
AbstractChannel.AbstractUnsafe.register0
這個方法實現了實際的註冊邏輯,
- 依然要做一些前置的設置和檢查工作,包括在註冊過程中不可取消,檢查channel是否存活,
- 調用jdk的api完成註冊。例如,對於jdk Nio的通道的註冊就是調用SelectableChannel.register(Selector sel, int ops, Object att)
- 調用所有的已添加的處理器節點的ChannelHandler.handlerAdded方法,實際上這也會調用handler.handlerRemoved方法,如果在此之前有handler被移除掉的話
- 通知future對象已經註冊成功了
- 觸發一個channel註冊成功的事件,這個事件會在pipeline中傳播,所有註冊的handler會依次接收到該事件並作出相應的處理
如果是第一次註冊,還需要觸發一個channel存活的事件,讓所有的handler作出相應的處理
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop // 將ChannelPromise設置為不可取消,並檢查channel是否還存活,通過內部的jdk的channel檢查是否存活 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // 是否第一次註冊, // TODO 說明情況下會註冊多次?? boolean firstRegistration = neverRegistered; // 完成實際的註冊,即底層api的調用 // 如果對於jdk Nio的通道的註冊就是調用SelectableChannel.register(Selector sel, int ops, Object att) doRegister(); // 更新標誌變數 neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 調用所有的已添加的處理器節點的ChannelHandler.handlerAdded方法 pipeline.invokeHandlerAddedIfNeeded(); // 通過future對象已經註冊成功了 safeSetSuccess(promise); // 觸發一個channel註冊成功的事件,這個事件會在pipeline中傳播, // 所有註冊的handler會依次接收到該事件並作出相應的處理 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { // 如果是第一次註冊,還需要觸發一個channel存活的事件,讓所有的handler作出相應的處理 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 // 開始接收讀事件 // 對於Nio類型的channel, 通過調用jdk的相關api註冊讀事件為感興趣的事件 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
小結
到此,我們就完成了對channel的創建,初始化,和註冊到EventLoop過程的分析,整個過程看下來,其實並不複雜,只不過代碼的嵌套比較深,繼承結構複雜,有些簡單的功能可能要看好幾層才能找到真正實現的地方,所以還需要耐心和熟悉。這裡,我把主幹邏輯再提煉一下,去掉所有細枝末節的邏輯,一遍能有一個整體的認識:
- 首先通過反射創建了一個NioSocketChannel(通過反射調用無參構造器)
- 然後對channel對象進行初始化,主要是想這個channel的ChannelPipeline中添加用戶設置的handler
- 最後將這個channel註冊到一個EventLoop上,註冊過程設計jdk底層的selector註冊api的調用,調用handler的回調方法,在channelPipeline中觸發一個channel註冊的事件,這些事件最終回調各個handler對象的channelRegistered方法。
接下來,我們回到Bootstrap.doResolveAndConnect方法中,繼續完成建立連接的過程的分析。
Bootstrap.doResolveAndConnect0
連接的建立在方法doResolveAndConnect0中實現:
這個方法的主要工作就是對遠程地址進行解析,比如通過dns伺服器對功能變數名稱進行解析,
然後使用解析後的地址進行連接的建立,連接建立調用doConnect方法
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
// 獲取一個地址解析器
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
// 如果解析器不支持該地址或者改地址已經被解析過了,那麼直接開始創建連接
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
// 對遠程地址進行解析
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
// 解析成功後進行連接
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
// 給future對象添加一個回調,採用非同步方法進行連接,
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
Bootstrap.doConnect
調用channel的connect方法完成連接過程。
也許是之前看scala代碼習慣了,回過頭來看java代碼感覺很冗餘,一大堆代碼就表達了那一點邏輯,感覺信息密度太低,現在有很多人認為java會漸漸的沒落,而最優可能取代java的語言中,scala絕對是強有力的競爭者之一,沒有對比就沒有傷害,跟java比,scala語言真的是簡潔太多了,幾句話就能把所要表達的邏輯精準而又直接地表達出來。好像向聲明式編程更靠近了一點。
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
// 調用 channel.connect方法完成連接
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
AbstractChannel.connect
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
DefaultChannelPipeline.connect
這裡稍微說明一下,tail是整個鏈條的尾節點,如果對netty比較熟悉的話,應該知道netty對於io事件的處理採用責任鏈的模式,即用戶可以設置多個處理器,這些處理器組成一個鏈條,io事件在這個鏈條上傳播,被特定的一些處理器所處理,而其中有兩個特殊的處理器head和tail,他們分別是這個鏈條的頭和尾,他們的存在主要是為了實現一些特殊的邏輯。
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
AbstractChannelHandlerContext.connect
中間經過幾個調用之後,最終調用該方法。這裡有一句關鍵代碼findContextOutbound(MASK_CONNECT),這個方法的代碼我就不貼了,大概說一下它的作用,更為具體的機制等後面分析Channelpipeline是在詳細說明。這個方法會在處理器鏈中從後向前遍歷,直到找到能夠處理connect事件的處理器,能否處理某種類型的事件是通過比特位判斷的,每個AbstractChannelHandlerContext對象內部有一個int型變數用於存儲標誌各種類型事件的比特位。一般,connect事件會有頭結點head來處理,也就是DefaultChannelPipeline.HeadContext類,所以我們直接看DefaultChannelPipeline.HeadContext.connect方法
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 找到下一個能夠進行connect操作的,這裡用比特位來標記各種不同類型的操作,
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 調用AbstractChannelHandlerContext.invokeConnect
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
DefaultChannelPipeline.HeadContext.connect
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
unsafe對象的賦值:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
所以我們直接看unsafe.connect
AbstractNioChannel.connect
主要邏輯:
- 狀態檢查,非空檢查
- 調用doConnect方法進行連接
- 如果立即就連接成功了,那麼將future對象設置為成功
- 如果超時大於0,會提交一個延遲調度的任務,在超時時間到達後執行這個任務檢查是否連接成功,如果為連接成功連接說明連接超時,需要關閉通道
- 向future對象添加一個回調,在future被外部調用者取消時將通道關閉
可見建立連接的核心方法是doConnect,這是一個抽象方法,我們看NioSocketChannel,也就是tcp連接的建立過程,查看AbstractNioChannel的實現類發現還有UDP,SCTP等協議
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 檢查promise狀態,channel存活狀態
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
// 防止重覆連接
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
// 調用doConnect方法進行連接
if (doConnect(remoteAddress, localAddress)) {
// 如果立即就連接成功了,那麼將future對象設置為成功
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 如果超時大於0,那麼會在超時到達後檢查是否連接成功
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
// 如果connectPromise能夠標記為失敗,說明此時還沒有連接成功,也就是連接超時了
// 此時需要關閉該通道
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 向future對象添加一個回調,在future被外部調用者取消時將通道關閉
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
NioSocketChannel.doConnect
- 首先綁定指定的本地地址
調用SocketUtils.connect建立連接
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// 綁定指定的本地地址
if (localAddress != null) {
doBind0(localAddress);
}// 這個變數標記建立連接的動作是否發起成功 // 成功發起建立連接的工作並不表示連接已經成功建立 boolean success = false; try { // 實際建立連接的語句 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; // 返回連接是否已經成功建立 return connected; } finally { if (!success) { doClose(); } }
}
SocketUtils.connect
可以看到,最終是通過調用jdk的api來實現連接的建立,也就是SocketChannel.connect方法
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
// 調用jdk api建立連接,SocketChannel.connect
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
總結
一句話,這代碼是真的很深! 非常不直接,初次看的話,如果沒有一個代碼框架圖在旁邊參考,很容易迷失在層層的繼承結構中,很多代碼層層調用,真正有用的邏輯隱藏的很深,所以看這中代碼必須要有耐心,有毅力,要有打破砂鍋問到底的決心。不過這樣的複雜的代碼結構好處也是顯而易見的,那就是良好的擴展性,你可以在任意層級進行擴展。
總結一下建立連接的過程,我認為可以歸結為三個主要的方面:
- 第一, 實際建立邏輯的代碼肯定還是jdk api
- 第二,這麼多方法調用,主要的作用就是迎合框架的要求,本質上是為了代碼的擴展性,比如ChannelPipeline的處理器鏈
- 第三,另一個主要的工作就是對future對象的處理,這時實現非同步的重要手段,future對象也是外部調用者和對象內部狀態之間的連接紐帶,調用者通過future對象完成一些功能,如查狀態,發出取消動作,實現阻塞等待等。