Netty 概述 1、什麼是 Netty Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protoc ...
Netty 概述
1、什麼是 Netty
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty 是一個非同步的、基於事件驅動的網路應用框架,用於快速開發可維護、高性能的網路伺服器和客戶端
註意:netty的非同步還是基於多路復用的,並沒有實現真正意義上的非同步IO
2、Netty 的優勢
如果使用傳統 NIO,其工作量大,bug 多
- 需要自己構建協議
- 解決 TCP 傳輸問題,如粘包、半包
- 因為 bug 的存在,epoll 空輪詢導致 CPU 100%
Netty 對 API 進行增強,使之更易用,如
- FastThreadLocal => ThreadLocal
- ByteBuf => ByteBuffer
3、入門案例
1、伺服器端代碼
public class HelloServer {
public static void main(String[] args) {
// 1、啟動器,負責裝配netty組件,啟動伺服器
new ServerBootstrap()
// 2、創建 NioEventLoopGroup,可以簡單理解為 線程池 + Selector
.group(new NioEventLoopGroup())
// 3、選擇伺服器的 ServerSocketChannel 實現
.channel(NioServerSocketChannel.class)
// 4、child 負責處理讀寫,該方法決定了 child 執行哪些操作
// ChannelInitializer 處理器(僅執行一次)
// 它的作用是待客戶端 SocketChannel 建立連接後,執行 initChannel 以便添加更多的處理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 5、SocketChannel的處理器,使用StringDecoder解碼,ByteBuf=>String
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 6、SocketChannel的業務處理,使用上一個處理器的處理結果
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
// 7、ServerSocketChannel綁定8080埠
}).bind(8080);
}
}
2、客戶端代碼
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
// 選擇客戶 Socket 實現類,NioSocketChannel 表示基於 NIO 的客戶端實現
.channel(NioSocketChannel.class)
// ChannelInitializer 處理器(僅執行一次)
// 它的作用是待客戶端SocketChannel建立連接後,執行initChannel以便添加更多的處理器
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 消息會經過通道 handler 處理,這裡是將 String => ByteBuf 編碼發出
channel.pipeline().addLast(new StringEncoder());
}
})
// 指定要連接的伺服器和埠
.connect(new InetSocketAddress("localhost", 8080))
// Netty 中很多方法都是非同步的,如 connect
// 這時需要使用 sync 方法等待 connect 建立連接完畢
.sync()
// 獲取 channel 對象,它即為通道抽象,可以進行數據讀寫操作
.channel()
// 寫入消息並清空緩衝區
.writeAndFlush("hello world");
}
}
3、運行流程
左:客戶端 右:伺服器端
組件解釋
- channel 可以理解為數據的通道
- msg 理解為流動的數據,最開始輸入是 ByteBuf,但經過 pipeline 中的各個 handler 加工,會變成其它類型對象,最後輸出又變成 ByteBuf
- handler 可以理解為數據的處理工序
-
工序有多道,合在一起就是 pipeline(傳遞途徑),pipeline 負責發佈事件(讀、讀取完成…)傳播給每個 handler, handler 對自己感興趣的事件進行處理(重寫了相應事件處理方法)
- pipeline 中有多個 handler,處理時會依次調用其中的 handler
-
handler 分 Inbound 和 Outbound 兩類
- Inbound 入站
-
Outbound 出站
-
- eventLoop 可以理解為處理數據的工人
- eventLoop 可以管理多個 channel 的 io 操作,並且一旦 eventLoop 負責了某個 channel,就會將其與 channel 進行綁定,以後該 channel 中的 io 操作都由該 eventLoop 負責
- eventLoop 既可以執行 io 操作,也可以進行任務處理,每個 eventLoop 有自己的任務隊列,隊列里可以堆放多個 channel 的待處理任務,任務分為普通任務、定時任務
- eventLoop 按照 pipeline 順序,依次按照 handler 的規劃(代碼)處理數據,可以為每個 handler 指定不同的 eventLoop
1、EventLoop
事件迴圈對象 EventLoop
EventLoop 本質是一個單線程執行器(同時維護了一個 Selector),裡面有 run 方法處理一個或多個 Channel 上源源不斷的 io 事件
它的繼承關係如下
-
繼承自 j.u.c.ScheduledExecutorService 因此包含了線程池中所有的方法
-
繼承自 netty 自己的 OrderedEventExecutor
-
提供了 boolean inEventLoop (Thread thread) 方法判斷一個線程是否屬於此 EventLoop
-
提供了 EventLoopGroup parent () 方法來看看自己屬於哪個 EventLoopGroup
-
事件迴圈組 EventLoopGroup
EventLoopGroup 是一組 EventLoop,Channel 一般會調用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop,後續這個 Channel 上的 io 事件都由此 EventLoop 來處理(保證了 io 事件處理時的線程安全)
- 繼承自 netty 自己的 EventExecutorGroup
- 實現了 Iterable 介面提供遍歷 EventLoop 的能力
- 另有 next 方法獲取集合中下一個 EventLoop
1.1 處理普通與定時任務
public class TestEventLoop {
public static void main(String[] args) {
// 創建擁有兩個EventLoop的NioEventLoopGroup,對應兩個線程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通過next方法可以獲得下一個 EventLoop
System.out.println(group.next());
System.out.println(group.next());
// 通過EventLoop執行普通任務
group.next().execute(()->{
System.out.println(Thread.currentThread().getName() + " hello");
});
// 通過EventLoop執行定時任務
group.next().scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName() + " hello2");
}, 0, 1, TimeUnit.SECONDS);
// 優雅地關閉
group.shutdownGracefully();
}
}
輸出結果如下
io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
關閉 EventLoopGroup
優雅關閉 shutdownGracefully 方法。該方法會首先切換 EventLoopGroup 到關閉狀態從而拒絕新的任務的加入,然後在任務隊列的任務都處理完成後,停止線程的運行。從而確保整體應用是在正常有序的狀態下退出的
1.2 處理 IO 任務
伺服器代碼
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
客戶端代碼
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
// 此處打斷點調試,調用 channel.writeAndFlush(...);
System.in.read();
}
}
1.3 分工
Bootstrap 的 group () 方法可以傳入兩個 EventLoopGroup 參數,分別負責處理不同的事件
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 兩個Group,分別為Boss 負責Accept事件,Worker 負責讀寫事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
...
}
}
多個客戶端分別發送 hello 結果
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4
可以看出,一個 EventLoop 可以負責多個 Channel,且 EventLoop 一旦與 Channel 綁定,則一直負責處理該 Channel 中的事件
增加自定義 EventLoopGroup
當有的任務需要較長的時間處理時,可以使用非 NioEventLoopGroup,避免同一個 NioEventLoop 中的其他 Channel 在較長的時間內都無法得到處理
public class MyServer {
public static void main(String[] args) {
// 增加自定義的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加兩個handler,第一個使用NioEventLoopGroup處理,第二個使用自定義EventLoopGroup處理
socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
// 調用下一個handler
ctx.fireChannelRead(msg);
}
})
// 該handler綁定自定義的Group
.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
啟動四個客戶端發送數據
nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4
可以看出,客戶端與伺服器之間的事件,被 nioEventLoopGroup 和 defaultEventLoopGroup 分別處理
切換的實現
不同的 EventLoopGroup 切換的實現原理如下
由上面的圖可以看出,當 handler 中綁定的 Group 不同時,需要切換 Group 來執行不同的任務
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 獲得下一個EventLoop, excutor 即為 EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一個EventLoop 在當前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用當前 EventLoopGroup 中的 EventLoop 來處理任務
next.invokeChannelRead(m);
} else {
// 否則讓另一個 EventLoopGroup 中的 EventLoop 來創建任務並執行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果兩個 handler 綁定的是同一個 EventLoopGroup,那麼就直接調用
- 否則,把要調用的代碼封裝為一個任務對象,由下一個 handler 的 EventLoopGroup 來調用
2、Channel
Channel 的常用方法
- close () 可以用來關閉 Channel
- closeFuture () 用來處理 Channel 的關閉
- sync 方法作用是同步等待 Channel 關閉
- 而 addListener 方法是非同步等待 Channel 關閉
- pipeline () 方法用於添加處理器
- write () 方法將數據寫入
- 因為緩衝機制,數據被寫入到 Channel 中以後,不會立即被髮送
- 只有當緩衝滿了或者調用了 flush () 方法後,才會將數據通過 Channel 發送出去
- writeAndFlush () 方法將數據寫入並立即發送(刷出)
2.1 ChannelFuture
連接問題
拆分客戶端代碼
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 該方法為非同步非阻塞方法,主線程調用後不會被阻塞,真正去執行連接操作的是NIO線程
// NIO線程:NioEventLoop 中的線程
.connect(new InetSocketAddress("localhost", 8080));
// 該方法用於等待連接真正建立
channelFuture.sync();
// 獲取客戶端-伺服器之間的Channel對象
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
System.in.read();
}
}
如果我們去掉 channelFuture.sync() 方法,會伺服器無法收到 hello world
這是因為建立連接 (connect) 的過程是 非同步非阻塞 的,若不通過 sync() 方法阻塞主線程,等待連接真正建立,這時通過 channelFuture.channel () 拿到的 Channel 對象,並不是真正與伺服器建立好連接的 Channel,也就沒法將信息正確的傳輸給伺服器端
所以需要通過 channelFuture.sync() 方法,阻塞主線程,同步處理結果,等待連接真正建立好以後,再去獲得 Channel 傳遞數據。使用該方法,獲取 Channel 和發送數據的線程 都是主線程
下麵還有一種方法,用於 非同步 獲取建立連接後的 Channel 和發送數據,使得執行這些操作的線程是 NIO 線程(去執行 connect 操作的線程)
addListener 方法
通過這種方法可以在 NIO 線程中獲取 Channel 併發送數據,而不是在主線程中執行這些操作
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 該方法為非同步非阻塞方法,主線程調用後不會被阻塞,真正去執行連接操作的是NIO線程
// NIO線程:NioEventLoop 中的線程
.connect(new InetSocketAddress("localhost", 8080));
// 當connect方法執行完畢後,也就是連接真正建立後
// 會在NIO線程中調用operationComplete方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
}
});
System.in.read();
}
}
處理關閉
public class ReadClient {
public static void main(String[] args) throws InterruptedException {
// 創建EventLoopGroup,使用完畢後關閉
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
// 創建一個線程用於輸入並向伺服器發送
new Thread(()->{
while (true) {
String msg = scanner.next();
if ("q".equals(msg)) {
// 關閉操作是非同步的,在NIO線程中執行
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "inputThread").start();
// 獲得closeFuture對象
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");
// 同步等待NIO線程執行完close操作
closeFuture.sync();
// 關閉之後執行一些操作,可以保證執行的操作一定是在channel關閉以後執行的
System.out.println("關閉之後執行一些額外操作...");
// 關閉EventLoopGroup
group.shutdownGracefully();
}
}
關閉channel
當我們要關閉 channel 時,可以調用 channel.close () 方法進行關閉。但是該方法也是一個非同步方法。真正的關閉操作並不是在調用該方法的線程中執行的,而是在 NIO 線程中執行真正的關閉操作
如果我們想在 channel 真正關閉以後,執行一些額外的操作,可以選擇以下兩種方法來實現
- 通過 channel.closeFuture () 方法獲得對應的 ChannelFuture 對象,然後調用 sync () 方法阻塞執行操作的線程,等待 channel 真正關閉後,再執行其他操作
// 獲得closeFuture對象
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO線程執行完close操作
closeFuture.sync();
- 調用 closeFuture.addListener 方法,添加 close 的後續操作
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel關閉後才執行的操作
System.out.println("關閉之後執行一些額外操作...");
// 關閉EventLoopGroup
group.shutdownGracefully();
}
});
3、Future 與 Promise
3.1 概念
netty 中的 Future 與 jdk 中的 Future 同名,但是是兩個介面
netty 的 Future 繼承自 jdk 的 Future,而 Promise 又對 netty Future 進行了擴展
-
jdk Future 只能同步等待任務結束(或成功、或失敗)才能得到結果
-
netty Future 可以同步等待任務結束得到結果,也可以非同步方式得到結果,但都是要等任務結束
-
netty Promise 不僅有 netty Future 的功能,而且脫離了任務獨立存在,只作為兩個線程間傳遞結果的容器
3.2 JDK Future
public class JdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "JdkFuture");
}
};
// 創建線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);
// 獲得Future對象
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return 50;
}
});
// 通過阻塞的方式,獲得運行結果
System.out.println(future.get());
}
}
3.3 Netty Future
public class NettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 獲得 EventLoop 對象
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 50;
}
});
// 主線程中獲取結果
System.out.println(Thread.currentThread().getName() + " 獲取結果");
System.out.println("getNow " + future.getNow());
System.out.println("get " + future.get());
// NIO線程中非同步獲取結果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 獲取結果");
System.out.println("getNow " + future.getNow());
}
});
}
}
運行結果
main 獲取結果
getNow null
get 50
nioEventLoopGroup-2-1 獲取結果
getNow 50
Netty 中的 Future 對象,可以通過 EventLoop 的 sumbit () 方法得到
-
可以通過 Future 對象的 get 方法,阻塞地獲取返回結果
-
也可以通過 getNow 方法,獲取結果,若還沒有結果,則返回 null,該方法是非阻塞的
-
還可以通過 future.addListener 方法,在 Callable 方法執行的線程中,非同步獲取返回結果
3.4 Netty Promise
Promise 相當於一個容器,可以用於存放各個線程中的結果,然後讓其他線程去獲取該結果
public class NettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 創建EventLoop
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 創建Promise對象,用於存放結果
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 自定義線程向Promise中存放結果
promise.setSuccess(50);
}).start();
// 主線程從Promise中獲取結果
System.out.println(Thread.currentThread().getName() + " " + promise.get());
}
}
4、Handler 與 Pipeline
4.1 Pipeline
public class PipeLineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 在socketChannel的pipeline中添加handler
// pipeline中handler是帶有head與tail節點的雙向鏈表,的實際結構為
// head <-> handler1 <-> ... <-> handler4 <->tail
// Inbound主要處理入站操作,一般為讀操作,發生入站操作時會觸發Inbound方法
// 入站時,handler是從head向後調用的
socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
// 父類該方法內部會調用fireChannelRead
// 將數據傳遞給下一個handler
super.channelRead(ctx, msg);
}
});
socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
// 執行write操作,使得Outbound的方法能夠得到調用
socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
super.channelRead(ctx, msg);
}
});
// Outbound主要處理出站操作,一般為寫操作,發生出站操作時會觸發Outbound方法
// 出站時,handler的調用是從tail向前調用的
socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
super.write(ctx, msg, promise);
}
});
socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
運行結果如下
nioEventLoopGroup-2-2 Inbound handler 1
nioEventLoopGroup-2-2 Inbound handler 2
nioEventLoopGroup-2-2 Outbound handler 2
nioEventLoopGroup-2-2 Outbound handler 1
通過 channel.pipeline ().addLast (name, handler) 添加 handler 時,記得給 handler 取名字。這樣可以調用 pipeline 的 addAfter、addBefore 等方法更靈活地向 pipeline 中添加 handler
handler 需要放入通道的 pipeline 中,才能根據放入順序來使用 handler
- pipeline 是結構是一個帶有 head 與 tail 指針的雙向鏈表,其中的節點為 handler
- 要通過 ctx.fireChannelRead (msg) 等方法,將當前 handler 的處理結果傳遞給下一個 handler
- 當有 入站(Inbound)操作時,會從 head 開始向後 調用 handler,直到 handler 不是處理 Inbound 操作為止
- 當有 出站(Outbound)操作時,會從 tail 開始向前 調用 handler,直到 handler 不是處理 Outbound 操作為止
具體結構如下
調用順序如下
4.2 OutboundHandler
socketChannel.writeAndFlush()
當 handler 中調用該方法進行寫操作時,會觸發 Outbound 操作,此時是從 tail 向前尋找 OutboundHandler
ctx.writeAndFlush()
當 handler 中調用該方法進行寫操作時,會觸發 Outbound 操作,此時是從當前 handler 向前尋找 OutboundHandler
4.3 EmbeddedChannel
EmbeddedChannel 可以用於測試各個 handler,通過其構造函數按順序傳入需要測試 handler,然後調用對應的 Inbound 和 Outbound 方法即可
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};
// 用於測試Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 執行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 執行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}
5、ByteBuf
調試工具方法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
該方法可以幫助我們更為詳細地查看 ByteBuf 中的內容
5.1 創建
public class ByteBufStudy {
public static void main(String[] args) {
// 創建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBufUtil.log(buffer);
// 向buffer中寫入數據
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 20; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
// 查看寫入結果
ByteBufUtil.log(buffer);
}
}
運行結果
read index:0 write index:0 capacity:16
read index:0 write index:20 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 |aaaa |
+--------+-------------------------------------------------+----------------+
- ByteBuf 通過 ByteBufAllocator 選擇 allocator 並調用對應的 buffer () 方法來創建的 ,預設使用 直接記憶體 作為 ByteBuf,容量為 256 個位元組,可以指定初始容量的大小
- 當 ByteBuf 的容量無法容納所有數據時,ByteBuf 會進行擴容操作
- 如果在 handler 中創建 ByteBuf,建議使用 ChannelHandlerContext ctx.alloc ().buffer () 來創建
5.2 直接記憶體與堆記憶體
通過該方法創建的 ByteBuf,使用的是基於直接記憶體的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
可以使用下麵的代碼來創建池化 基於堆 的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
也可以使用下麵的代碼來創建池化基於直接記憶體的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
- 直接記憶體創建和銷毀的代價昂貴,但讀寫性能高(少一次記憶體複製),適合配合池化功能一起用
- 直接記憶體對 GC 壓力小,因為這部分記憶體不受 JVM 垃圾回收的管理,但也要註意及時主動釋放
驗證
public class ByteBufStudy {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
System.out.println(buffer.getClass());
buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
System.out.println(buffer.getClass());
}
}
// 使用池化的直接記憶體
class io.netty.buffer.PooledUnsafeDirectByteBuf
// 使用池化的堆記憶體
class io.netty.buffer.PooledUnsafeHeapByteBuf
// 使用池化的直接記憶體
class io.netty.buffer.PooledUnsafeDirectByteBuf
5.3 池化與非池化
池化的最大意義在於可以重用 ByteBuf,優點有
- 沒有池化,則每次都得創建新的 ByteBuf 實例,這個操作對直接記憶體代價昂貴,就算是堆記憶體,也會增加 GC 壓力
- 有了池化,則可以重用池中 ByteBuf 實例,並且採用了與 jemalloc 類似的記憶體分配演算法提升分配效率
- 高併發時,池化功能更節約記憶體,減少記憶體溢出的可能
池化功能是否開啟,可以通過下麵的系統環境變數來設置
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1 以後,非 Android 平臺預設啟用池化實現,Android 平臺啟用非池化實現
- 4.1 之前,池化功能還不成熟,預設是非池化實現
5.4 組成
ByteBuf 主要有以下幾個組成部分
- 最大容量與當前容量
- 在構造 ByteBuf 時,可傳入兩個參數,分別代表初始容量和最大容量,若未傳入第二個參數(最大容量),最大容量預設為 Integer.MAX_VALUE
- 當 ByteBuf 容量無法容納所有數據時,會進行擴容操作,若超出最大容量,會拋出 java.lang.IndexOutOfBoundsException 異常
- 讀寫操作不同於 ByteBuffer 只用 position 進行控制,ByteBuf 分別由讀指針和寫指針兩個指針控制。進行讀寫操作時,無需進行模式的切換
- 讀指針前的部分被稱為廢棄部分,是已經讀過的內容
- 讀指針與寫指針之間的空間稱為可讀部分
- 寫指針與當前容量之間的空間稱為可寫部分
5.5 寫入
常用方法如下
註意
- 這些方法的未指明返回值的,其返回值都是 ByteBuf,意味著可以鏈式調用來寫入不同的數據
- 網路傳輸中,預設習慣是 Big Endian,使用 writeInt (int value)
使用方法
public class ByteBufStudy {
public static void main(String[] args) {
// 創建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);
// 向buffer中寫入數據
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
buffer.writeInt(5);
ByteBufUtil.log(buffer);
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}
運行結果
read index:0 write index:0 capacity:16
read index:0 write index:4 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+
還有一類方法是 set 開頭的一系列方法,也可以寫入數據,但不會改變寫指針位置
5.6 擴容
當 ByteBuf 中的容量無法容納寫入的數據時,會進行擴容操作
buffer.writeLong(7);
ByteBufUtil.log(buffer);
// 擴容前
read index:0 write index:12 capacity:16
...
// 擴容後
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+
擴容規則
-
如何寫入後數據大小未超過 512 位元組,則選擇下一個 16 的整數倍進行擴容
- 例如寫入後大小為 12 位元組,則擴容後 capacity 是 16 位元組
-
如果寫入後數據大小超過 512 位元組,則選擇下一個 2^n
-
例如寫入後大小為 513 位元組,則擴容後 capacity 是 210=1024 位元組(29=512 已經不夠了)
-
擴容不能超過 maxCapacity,否則會拋出 java.lang.IndexOutOfBoundsException 異常
Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)
5.7 讀取
讀取主要是通過一系列 read 方法進行讀取,讀取時會根據讀取數據的位元組數移動讀指針
如果需要 重覆讀取 ,需要調用 buffer.markReaderIndex() 對讀指針進行標記,並通過 buffer.resetReaderIndex() 將讀指針恢復到 mark 標記的位置
public class ByteBufStudy {
public static void main(String[] args) {
// 創建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
// 向buffer中寫入數據
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
// 讀取4個位元組
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
ByteBufUtil.log(buffer);
// 通過mark與reset實現重覆讀取
buffer.markReaderIndex();
System.out.println(buffer.readInt());
ByteBufUtil.log(buffer);
// 恢復到mark標記處
buffer.resetReaderIndex();
ByteBufUtil.log(buffer);
}
}
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
還有以 get 開頭的一系列方法,這些方法不會改變讀指針的位置
5.8 釋放
由於 Netty 中有堆外記憶體(直接記憶體)的 ByteBuf 實現,堆外記憶體最好是手動來釋放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 記憶體,只需等 GC 回收記憶體即可
- UnpooledDirectByteBuf 使用的就是直接記憶體了,需要特殊的方法來回收記憶體
- PooledByteBuf 和它的子類使用了池化機制,需要更複雜的規則來回收記憶體
Netty 這裡採用了引用計數法來控制回收記憶體,每個 ByteBuf 都實現了 ReferenceCounted 介面
- 每個 ByteBuf 對象的初始計數為 1
- 調用 release 方法計數減 1,如果計數為 0,ByteBuf 記憶體被回收
- 調用 retain 方法計數加 1,表示調用者沒用完之前,其它 handler 即使調用了 release 也不會造成回收
- 當計數為 0 時,底層記憶體會被回收,這時即使 ByteBuf 對象還在,其各個方法均無法正常使用
釋放規則
因為 pipeline 的存在,一般需要將 ByteBuf 傳遞給下一個 ChannelHandler,如果在每個 ChannelHandler 中都去調用 release ,就失去了傳遞性(如果在這個 ChannelHandler 內這個 ByteBuf 已完成了它的使命,那麼便無須再傳遞)
基本規則是,誰是最後使用者,誰負責 release
-
起點,對於 NIO 實現來講,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次創建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead (byteBuf))
-
入站 ByteBuf 處理原則
- 對原始 ByteBuf 不做處理,調用 ctx.fireChannelRead (msg) 向後傳遞,這時無須 release
- 將原始 ByteBuf 轉換為其它類型的 Java 對象,這時 ByteBuf 就沒用了,必須 release
- 如果不調用 ctx.fireChannelRead (msg) 向後傳遞,那麼也必須 release
- 註意各種異常,如果 ByteBuf 沒有成功傳遞到下一個 ChannelHandler,必須 release
- 假設消息一直向後傳,那麼 TailContext 會負責釋放未處理消息(原始的 ByteBuf)
-
出站 ByteBuf 處理原則
- 出站消息最終都會轉為 ByteBuf 輸出,一直向前傳,由 HeadContext flush 後 release
-
異常處理原則
- 有時候不清楚 ByteBuf 被引用了多少次,但又必須徹底釋放,可以迴圈調用 release 直到返回 true
while (!buffer.release()) {}
當 ByteBuf 被傳到了 pipeline 的 head 與 tail 時,ByteBuf 會被其中的方法徹底釋放,但前提是 ByteBuf 被傳遞到了 head 與 tail 中
TailConext 中釋放 ByteBuf 的源碼
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具體的釋放方法
ReferenceCountUtil.release(msg);
}
}
判斷傳過來的是否為 ByteBuf,是的話才需要釋放
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
5.9 切片
ByteBuf 切片是【零拷貝】的體現之一,對原始 ByteBuf 進行切片成多個 ByteBuf,切片後的 ByteBuf 並沒有發生記憶體複製,還是使用原始 ByteBuf 的記憶體,切片後的 ByteBuf 維護獨立的 read,write 指針
得到分片後的 buffer 後,要調用其 retain 方法,使其內部的引用計數加一。避免原 ByteBuf 釋放,導致切片 buffer 無法使用修改原 ByteBuf 中的值,也會影響切片後得到的 ByteBuf
public class TestSlice {
public static void main(String[] args) {
// 創建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
// 向buffer中寫入數據
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
// 將buffer分成兩部分
ByteBuf slice1 = buffer.slice(0, 5);
ByteBuf slice2 = buffer.slice(5, 5);
// 需要讓分片的buffer引用計數加一
// 避免原Buffer釋放導致分片buffer無法使用
slice1.retain();
slice2.retain();
ByteBufUtil.log(slice1);
ByteBufUtil.log(slice2);
// 更改原始buffer中的值
System.out.println("===========修改原buffer中的值===========");
buffer.setByte(0,5);
System.out.println("===========列印slice1===========");
ByteBufUtil.log(slice1);
}
}
運行結果
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========列印slice1===========
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
5.10 優勢
- 池化思想 - 可以重用池中 ByteBuf 實例,更節約記憶體,減少記憶體溢出的可能
- 讀寫指針分離,不需要像 ByteBuffer 一樣切換讀寫模式
- 可以自動擴容
- 支持鏈式調用,使用更流暢
- 很多地方體現零拷貝,例如
- slice、duplicate、CompositeByteBuf
本文由
傳智教育博學谷
教研團隊發佈。如果本文對您有幫助,歡迎
關註
和點贊
;如果您有任何建議也可留言評論
或私信
,您的支持是我堅持創作的動力。轉載請註明出處!