1. Netty源碼研究筆記(1)——開篇 1.1. Netty介紹 Netty是一個老牌的高性能網路框架。在眾多開源框架中都有它的身影,比如:grpc、dubbo、seata等。 裡面有著非常多值得學的東西: I/O模型 記憶體管理 各種網路協議的實現:http、redis、websocket等等 ...
1. Netty源碼研究筆記(1)——開篇
1.1. Netty介紹
Netty是一個老牌的高性能網路框架。在眾多開源框架中都有它的身影,比如:grpc、dubbo、seata等。
裡面有著非常多值得學的東西:
-
I/O模型
-
記憶體管理
-
各種網路協議的實現:http、redis、websocket等等
-
各種各樣有趣的技巧的實現:非同步、時間輪、池化、記憶體泄露探測等等。
-
代碼風格、設計思想、設計原則等。
1.2. 源碼分析方法
我一般是這樣進行源碼分析的:
-
首先是縱向,通過官方提供的demo,進行debug,並記錄在一個完整的生命周期下的調用鏈上,會涉及到哪些組件。
-
然後對涉及到的組件拿出來,找出它們的頂層定義(介面、抽象類)。通過其模塊/包的劃分、類註釋、定義的方法及其註釋,來大致知曉每個組件是做什麼的,以及它們在整個框架中的位置是怎樣的。
-
第二步完成後,就可以對第一步的調用鏈流程、步驟、涉及到的組件,進行歸納、劃分,從而做到心中有數,知道東南西北了。
-
之後就是橫向,對這些歸納出來的組件體系,逐個進行分析。
-
在分析每個組件體系的時候,也是按照先縱向,再橫向的步驟:
-
首先是縱向:找出該組件體系中的核心頂層介面、類,然後結合其的所有實現類,捋出繼承樹,然後弄清楚每個類做的是啥,它是怎麼定義的,同一層級的不同實現類之間的區別大致是什麼,必要的話,可以將這個繼承樹記下來,在心中推算幾遍。
-
然後是橫向:將各個類有選擇性地拿出來分析。
-
當然,所謂的縱向,橫向,這兩個過程實際是互相交織的,也就是說整個流程不一定就分為前後兩半:前面一半都是縱向,後面一半都是橫向。
通過縱向的分析,我們能發現整個框架可以分成大致哪幾個部分,以及有
1.3. 分析前的準備
- 首先在本地建一個對應的分析學慣用的項目,比如:learn_netty,用maven管理依賴
- 然後在maven倉庫,中找到我們需要的依賴,比如這裡我用的是最新的:
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>
- 將官方提供的demo代碼,導入到項目中。
- 學習項目搭建好之後,就嘗試編譯、運行,沒問題後,就命令行
mvn dependency:sources
命令(或者通過IDE)來下載依賴的源代碼。 - 可選:在github上,將項目同時clone到本地,如果分析中發現問題或者自己有些優化建議,可以嘗試為分析的項目貢獻代碼。
1.4. 分析示例的代碼
以一個簡單的EchoServer、EchoClient來研究。
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
new EchoServer(8083).start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
public class EchoClient {
public static void main(String[] args) throws Exception {
connect("127.0.0.1", 8083);
}
public static void connect(String host, int port) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = bootstrap.connect();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Sockets!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println(msg.toString(CharsetUtil.UTF_8));
}
}
1.5. 開始分析
分別啟動EchoServer、EchoClient,在兩個ChannelFuture的位置打斷點。
1.5.1. EchoServer啟動調用鏈
進入ServerBootstrap
的bind
方法,發現該方法定義在父類AbstractBootstrap
中:
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
接著來看doBind
方法,發現也在AbstractBootstrap
中:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
發現doBind
中主要做了兩件事:
-
initAndRegister
(初始化Channel並註冊到EventLoop中),這個操作是非同步操作,立即返回該操作對應的句柄。 -
拿到
initAndRegister
操作的句柄後,對其進行檢查。-
如果
initAndRegister
已完成那麼立即進行doBind0
操作(實際的bind
操作),並返回doBind0
操作對應的句柄。 -
如果
initAndRegister
還沒有完成,那麼就將doBind0
操作非同步化:initAndRegister
操作完成後再觸發doBind0
。
-
然後我們先看initAndRegister
,它同樣在AbstractBootstrap
中:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
忽略對異常的處理,看到有三個步驟:
-
使用工廠創建一個
channel
-
對這個
channel
進行init
:由子類實現。 -
將創建的
channel
註冊(register
)到EventLoopGroup
中,非同步操作,將該操作對應的句柄返回。
看完了initAndRegister
後,在回來看doBind0
:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
發現在doBind0
中,最終是通過調用channel
的bind
方法來完成的。而這個動作是包裹成了一個任務,提交給了channel
所註冊到的eventloop
,由它來執行。
1.5.2. EchoClient啟動調用鏈
首先進入Bootstrap
的connect
方法中:
public ChannelFuture connect() {
validate();
SocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress not set");
}
return doResolveAndConnect(remoteAddress, config.localAddress());
}
同樣忽略validate
,直接看doResolveAndConnect
。
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
我們發現Bootstrap::doResolveAndConnect
和AbstractBootstrap::doBind
類似。意思也是說,在initAndRegister
完成channel
的創建、初始化、綁定到EventLoop
之後再進行實際的操作doResolveAndConnect0
。
於是我們來看doResolveAndConnect0
:
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
AddressResolver<SocketAddress> resolver;
try {
resolver = this.resolver.getResolver(eventLoop);
} catch (Throwable cause) {
channel.close();
return promise.setFailure(cause);
}
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
我們可以看出,doResolveAndConnect0
正如其名:
- 首先獲取
channel
所綁定的eventloop
所對應的AddressResolver
(從AddressResolverGroup
)中拿。 - 拿到
AddressResolver
之後,如果它不知道該怎麼處理給定的需要連接的地址,或者說這個地址已經被其解析過,那麼就直接doConnect
。否則使用AddressResolver
來解析需要連接的地址(非同步操作),並將doConnect
操作非同步化。
先暫時忽略AddressResolver
,我們來看doConnect
:
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
我們看到doConnect
和之前的doBind0
一樣,最終也是調用channel
的方法,並且將實際的執行交給channel
綁定的eventloop
來執行。
1.6. 總結
就目前debug的調用鏈上,我們發現涉及到的組件有:
- Bootstrap系列:腳手架,提供給開發人員使用,類似Spring的ApplicationContext
- Channel系列:連接通道
- EventLoopGroup、EventLoop系列:執行器與事件驅動迴圈,IO模型。
- AddressResolverGroup、AddressResolver系列:地址解析器
- netty自定義的Future、Promise相關:非同步化的基礎
我們發現netty的操作全程是非同步化的,並且最終要解開其原理的廬山真面目,關鍵還在於提及的eventloop、channel。
此階段的縱向分析,目前只解開一隅,待我們看看eventloop、channel後,再來解開更大的謎題。
作者: 邁吉
出處: https://www.cnblogs.com/stepfortune/
關於作者:邁吉
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出, 原文鏈接 如有問題, 可郵件([email protected])咨詢.