本節我們看看Netty的傳輸(全是乾貨,自帶水杯 # 一、Java的NIO和OIO 流經網路的數據總是具有相同的類型:位元組。這些位元組是如何流動的主要取決於我們所說的網路傳輸。 ## 1.1 OIO 我們先來看一段Java的阻塞應用程式程式: ```java package com.example.j ...
本節我們看看Netty的傳輸(全是乾貨,自帶水杯
一、Java的NIO和OIO
流經網路的數據總是具有相同的類型:位元組。這些位元組是如何流動的主要取決於我們所說的網路傳輸。
1.1 OIO
我們先來看一段Java的阻塞應用程式程式:
package com.example.javademo;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
/**
* @author lhd
* @date 2023/05/17 13:19
* @notes java oio演示
*/
public class PlainOioServer {
public void serve(int port) throws IOException {
//綁定伺服器到指定埠
final ServerSocket socket = new ServerSocket(port);
try {
for (;;) {
//接受連接
final Socket clientSocket = socket.accept();
System.out.println(
"Accepted connection from " + clientSocket);
//創建一個新的線程來處理該連接
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
//將消息寫給新連接的客戶端
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
out.flush();
//關閉連接
clientSocket.close();
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
clientSocket.close();
}
catch (IOException ex) {
// 關閉時忽略
}
}
}
}).start(); //啟動線程
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
這段代碼完全可以處理中等數量的併發客戶端。但是隨著應用程式變得流行起來,你會發現它並不能很好地伸縮到支撐成千上萬的併發連入連接。
1.2 NIO
同樣,來看一段Java的非阻塞應用程式代碼:
package com.example.javademo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author lhd
* @date 2023/05/17 13:27
* @notes java NIO演示
*/
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ssocket = serverChannel.socket();
//將服務綁定到埠
InetSocketAddress address = new InetSocketAddress(port);
ssocket.bind(address);
//打開選擇器來處理Channel
Selector selector = Selector.open();
//將serverChannel註冊到選擇器用來接收連接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
//等待需要處理的新事件;阻塞 將一直持續到下一個傳入事件
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
//獲取所有接收事件的SelectionKey 實例
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
//檢查事件是否是一個新的已經就緒可以被接受的連接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
//接受客戶端,並將它註冊到選擇器
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
//檢查套接字是否已經準備好寫數據
if (key.isWritable()) {
SocketChannel client = (SocketChannel)key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
//將數據寫到已連接的客戶端
if (client.write(buffer) == 0) {
break;
}
}
//關閉連接
client.close();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
可以看出,兩段Java的OIO和NIO都做了相同的事:連接客戶端,併發送“Hi”。區別在於一個是阻塞的,另一個是非阻塞的,但兩段代碼卻完全不同。如果為了用於非阻塞 I/O 而重新實現這個簡單的應用程式,都需要一次完全的重寫的話,那麼不難想象,移植真正複雜的應用程式需要付出什麼樣的努力。
二、Netty的NIO和OIO
下來我們來看以下Netty 實現該應用程式將會是什麼樣子。
2.1 OIO
來看一段Netty的阻塞代碼
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* @author lhd
* @date 2023/05/17 13:47
* @notes Netty 阻塞程式演示
* ps:Netty已啟用OIO傳輸,使用 NIO / EPOLL / KQUEUE 傳輸。
*/
public class NettyOioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", StandardCharsets.UTF_8));
EventLoopGroup group = new OioEventLoopGroup();
try {
//創建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//使用 OioEventLoopGroup以允許阻塞模式
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,對於每個已接受的連接都調用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch){
//添加一個 ChannelInboundHandlerAdapter 以攔截和處理事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//將消息寫到客戶端,並添加 ChannelFutureListener,以便消息一被寫完就關閉連接
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
//綁定伺服器以接受連接
ChannelFuture f = b.bind().sync();
//釋放所有的資源
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
ps:這段代碼和我們上一篇文章中的有所區別,不要搞混淆了
2.2 NIO
我們再來看看Netty的非阻塞代碼:
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* @author lhd
* @date 2023/05/17 14:05
* @notes Netty 非阻塞代碼演示
*/
public class NettyNioServer {
public void server(int port) throws InterruptedException {
final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", StandardCharsets.UTF_8));
EventLoopGroup group = new NioEventLoopGroup(); // 1
try {
ServerBootstrap b = new ServerBootstrap();
//為非阻塞模式使用NioEventLoopGroup
b.group(group).channel(NioServerSocketChannel.class) // 2
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,對於每個已接受的連接都調用它
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//添加 ChannelInboundHandlerAdapter 以接收和處理事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//將消息寫到客戶端,並添加ChannelFutureListener,以便消息一被寫完就關閉連接
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
//綁定伺服器以接受連接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//釋放所有的資源
group.shutdownGracefully().sync();
}
}
}
可以看到,Netty的阻塞代碼和非阻塞代碼只有兩處不同(代碼中1 2 處)。這使得Netty的阻塞和非阻塞切換非常容易,Netty 為每種傳輸的實現都暴露了相同的 API,所以無論選用哪一種傳輸的實現,你的代碼都仍然幾乎不受影響。在所有的情況下,傳輸的實現都依賴於 interface Channel、ChannelPipeline 和 ChannelHandler。
三、傳輸API
傳輸 API 的核心是 interface Channel,它被用於所有的 I/O 操作。下圖是它的層次結構:
如圖所示,每個 Channel 都將會被分配一個 ChannelPipeline和ChannelConfig。ChannelConfig 包含了該 Channel 的所有配置設置,並且支持熱更新。由於特定的傳輸可能具有獨特的設置,所以它可能會實現一個 ChannelConfig 的子類型。
由於 Channel 是獨一無二的,所以為了保證順序將 Channel 聲明為 java.lang.Comparable 的一個子介面。因此,如果兩個不同的 Channel 實例都返回了相同的散列碼,那麼 AbstractChannel 中的 compareTo()方法的實現將會拋出一個 Error。
ChannelPipeline 持有所有將應用於入站和出站數據以及事件的 ChannelHandler 實例,這些 ChannelHandler 實現了應用程式用於處理狀態變化以及數據處理的邏輯。
ChannelHandler 的典型用途包括:
- 將數據從一種格式轉換為另一種格式;
- 提供異常的通知;
- 提供 Channel 變為活動的或者非活動的通知;
- 提供當 Channel 註冊到 EventLoop 或者從 EventLoop 註銷時的通知;
- 提供有關用戶自定義事件的通知。
ps:ChannelPipeline 實現了一種常見的設計模式—攔截過濾器(InterceptingFilter)。UNIX 管道是另外一個熟悉的例子:多個命令被鏈接在一起,其中一個命令的輸出端將連接到命令行中下一個命令的輸入端。
也可以根據需要通過添加或者移除ChannelHandler實例來修改ChannelPipeline。
通過利用Netty的這項能力可以構建出高度靈活的應用程式。例如,STARTTLS方法名協議被請求時,你可以簡單地通過 向 ChannelPipeline 添 加 一個適當的 ChannelHandler(SslHandler)來按需地支持STARTTLS協議。除了訪問所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel的其他方法。
比較重要的Channel方法如下:
方法名 | 描述 |
---|---|
eventLoop | 返回分配給 Channel 的 EventLoop |
pipeline | 返回分配給 Channel 的 ChannelPipeline |
isActive | 如果 Channel 是活動的,則返回 true。活動的意義可能依賴於底層的傳輸。例如,一個 Socket 傳輸一旦連接到了遠程節點便是活動的,而一個 Datagram 傳輸一旦被打開便是活動的 |
localAddress | 返回本地的 SokcetAddress |
remoteAddress | 返回遠程的 SocketAddress |
write | 將數據寫到遠程節點。這個數據將被傳遞給 ChannelPipeline,並且排隊直到它被沖刷 |
flush | 將之前已寫的數據沖刷到底層傳輸,如一個 SocketwriteAndFlush 一個簡便的方法,等同於調用 write()並接著調用 flush() |
Netty 所提供的廣泛功能只依賴於少量的介面,所以我們可以對我們的應用程式邏輯進行重大的修改,而又無需大規模地重構你的代碼庫。
下麵是一個寫數據並將其沖刷到遠程節點的例子。
Channel channel = ...
//創建要發送的數據
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
//寫數據並沖刷它
ChannelFuture cf = channel.writeAndFlush(buf);
//添加監聽,以便完後接收通知
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
//如果成功,在控制台列印成功
if (future.isSuccess()) {
System.out.println("Write successful");
} else {
//錯誤則列印錯誤和堆棧跟蹤
System.err.println("Write error");
future.cause().printStackTrace();
}
}
});
ps:Netty 的 Channel 實現是線程安全的,因此你可以存儲一個到 Channel 的引用,並且每當你需要向遠程節點寫數據時,都可以使用它,即使當時許多線程都在使用它。
上一篇博文說過,一個Channel對應一個eventLoop,一個eventLoop持有一個線程卻不一定只持有一個Channel。下麵展示多個線程使用同一個Channel。
final Channel channel = ...
//創建數據
final ByteBuf buf = Unpooled.copiedBuffer("your data",
CharsetUtil.UTF_8).retain();
//創建寫數據的Runable
Runnable writer = new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buf.duplicate());
}
};
//獲取線程池的引用
Executor executor = Executors.newCachedThreadPool();
//遞交任務給線程池,以便某個線程調用
executor.execute(writer);
//遞交另一個寫任務以便在另一個線程中執行
executor.execute(writer);
...
這些消息將會保證按照順序發送~
四、內置的傳輸
Netty 內置了一些可開箱即用的傳輸。因為並不是它們所有的傳輸都支持每一種協議,所以你必須選擇一個和你的應用程式所使用的協議相容的傳輸。
下麵是一些Netty提供的傳輸。
名 稱 | 包 | 描 述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作為基礎——基於選擇器的方式 |
Epoll | io.netty.channel.epoll | 由 JNI 驅動的 epoll()和非阻塞 IO。這個傳輸支持只有在Linux上可用的多種特性,如SO_REUSEPORT,比 NIO 傳輸更快,而且是完全非阻塞的 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作為基礎使用阻塞流 |
Local | io.netty.channel.local | 可以在 VM 內部通過管道進行通信的本地傳輸 |
Embedded | io.netty.channel.embedded | Embedded 傳輸,允許使用 ChannelHandler 而又不需要一個真正的基於網路的傳輸。這在測試你的ChannelHandler 實現時非常有用 |
之後我們回詳細介紹這些傳輸。
ps:Epoll這個是 Netty 特有的實現,更加適配 Netty 現有的線程模型,具有更高的性能以及更低的垃圾回收壓力
4.1 NIO
NIO 提供了一個所有 I/O 操作的全非同步的實現。它利用了自 NIO 子系統被引入JDK 1.4 時便可用的基於選擇器的 API。
選擇器背後的基本概念是充當一個註冊表,在那裡你將可以請求在 Channel 的狀態發生變化時得到通知。可能的狀態變化有:
- 新的 Channel 已被接受並且就緒;
- Channel 連接已經完成;
- Channel 有已經就緒的可供讀取的數據;
- Channel 可用於寫數據。
選擇器運行在一個檢查狀態變化並對其做出相應響應的線程上,在應用程式對狀態的改變做出響應之後,選擇器將會被重置,並將重覆這個過程。
下表中的常量值代表了由class java.nio.channels.SelectionKey定義的位模式。
這些位模式可以組合起來定義一組應用程式正在請求通知的狀態變化集。
名稱 | 描述 |
---|---|
OP_ACCEPT | 請求在接受新連接並創建 Channel 時獲得通知 |
OP_CONNECT | 請求在建立一個連接時獲得通知 |
OP_READ | 請求當數據已經就緒,可以從 Channel 中讀取時獲得通知 |
OP_WRITE | 請求當可以向 Channel 中寫更多的數據時獲得通知。這處理了套接字緩衝區被完全填滿時的情況,這種情況通常發生在數據的發送速度比遠程節點可處理的速度更快的時候 |
內置傳輸的處理流程圖:
ps:零拷貝(zero-copy)是一種目前只有在使用 NIO 和 Epoll 傳輸時才可使用的特性。它使你可以快速高效地將數據從文件系統移動到網路介面,而不需要將其從內核空間複製到用戶空間,其在像 FTP 或者HTTP 這樣的協議中可以顯著地提升性能。但是,並不是所有的操作系統都支持這一特性。特別地,它對於實現了數據加密或者壓縮的文件系統是不可用的——只能傳輸文件的原始內容。反過來說,傳輸已被加密的文件則不是問題
4.2 Epoll—用於 Linux 的本地非阻塞傳輸
Netty 的 NIO 傳輸基於 Java 提供的非同步/非阻塞網路編程的通用抽象。雖然這保證了 Netty 的非阻塞 API 可以在任何平臺上使用,但它也包含了相應的限制,因為 JDK為了在所有系統上提供相同的功能,必須做出妥協。
Netty為Linux提供了一組NIO API,其以一種和它本身的設計更加一致的方式使用epoll,並且以一種更加輕量的方式使用中斷。如果你的應用程式旨在運行於Linux系統,那麼請考慮利用這個版本的傳輸;你將發現在高負載下它的性能要優於JDK的NIO實現。
這個傳輸的語義和上一節內置傳輸的處理流程圖 所示的完全相同,而且它的用法也是簡單直接的。相關示例參照上面的Netty NIO代碼。如果要在那個代碼中使用 epoll 替代 NIO,只需要將 NioEventLoopGroup替換為EpollEventLoopGroup ,並且將 NioServerSocketChannel.class 替換為EpollServerSocketChannel.class 即可。
4.3 OIO
這個可以參照Netty的OIO代碼即可,最新的Netty中拋棄了OIO的傳輸方式~
下圖是OIO的傳輸處理邏輯:
4.4 用於 JVM 內部通信的 Local 傳輸
Netty 提供了一個 Local 傳輸,用於在同一個 JVM 中運行的客戶端和伺服器程式之間的非同步通信。同樣,這個傳輸也支持對於所有 Netty 傳輸實現都共同的 API。
在這個傳輸中,和伺服器 Channel 相關聯的 SocketAddress 並沒有綁定物理網路地址;相反,只要伺服器還在運行,它就會被存儲在註冊表裡,併在 Channel 關閉時註銷。因為這個傳輸並不接受真正的網路流量,所以它並不能夠和其他傳輸實現進行互操作。因此,客戶端希望連接到(在同一個 JVM 中)使用了這個傳輸的伺服器端時也必須使用它。除了這個限制,它的使用方式和其他的傳輸一模一樣。
4.5 Embedded 傳輸
Netty 提供了一種額外的傳輸,使得你可以將一組 ChannelHandler 作為幫助器類嵌入到其他的 ChannelHandler 內部。通過這種方式,你將可以擴展一個ChannelHandler 的功能,而又不需要修改其內部代碼。
Embedded 傳輸的關鍵是一個被稱為 EmbeddedChannel 的具體的 Channel
實現。我將在之後的文章中為大家演示這種方式。
五、傳輸的用例
Netty支持的傳輸和網路協議:
傳 輸 | TCP | UDP | SCTP | UDT |
---|---|---|---|---|
NIO | × | × | × | × |
Epoll(僅 Linux) | × | × | - | - |
OIO | × | × | × | × |
PS:UDT 協議實現了基於 UDP 協議的可靠傳輸;
在 Linux 上啟用 SCTP
SCTP 需要內核的支持,並且需要安裝用戶庫。
例如,對於 Ubuntu,可以使用下麵的命令:# sudo apt-get install libsctp1
對於 Fedora,可以使用 yum:#sudo yum install kernel-modules-extra.x86_64 lksctp-tools.x86_64
雖然只有SCTP傳輸有這些特殊要求,但是其他傳輸可能也有它們自己的配置選項需要考慮。此外,如果只是為了支持更高的併發連接數,伺服器平臺可能需要配置得和客戶端不一樣。
這裡是一些很可能會遇到的用例
- 非阻塞代碼庫——如果你的代碼庫中沒有阻塞調用(或者你能夠限制它們的範圍),那麼在 Linux 上使用 NIO 或者 epoll 始終是個好主意。雖然 NIO/epoll 旨在處理大量的併發連接,但是在處理較小數目的併發連接時,它也能很好地工作,尤其是考慮到它在連接之間共用線程的方式。
- 阻塞代碼庫——正如我們已經指出的,如果你的代碼庫嚴重地依賴於阻塞 I/O,而且你的應用程式也有一個相應的設計,那麼在你嘗試將其直接轉換為 Netty 的 NIO 傳輸時,你將可能會遇到和阻塞操作相關的問題。不要為此而重寫你的代碼,可以考慮分階段遷移:先從OIO 開始,等你的代碼修改好之後,再遷移到 NIO(或者使用 epoll,如果你在使用 Linux)。
- 在同一個 JVM 內部的通信——在同一個 JVM 內部的通信,不需要通過網路暴露服務,是Local 傳輸的完美用例。這將消除所有真實網路操作的開銷,同時仍然使用你的 Netty 代碼庫。如果隨後需要通過網路暴露服務,那麼你將只需要把傳輸改為 NIO 或者 OIO 即可。
- 測試你的 ChannelHandler 實現——如果你想要為自己的 ChannelHandler 實現編寫單元測試,那麼請考慮使用 Embedded 傳輸。這既便於測試你的代碼,而又不需要創建大量的模擬(mock)對象。你的類將仍然符合常規的 API 事件流,保證該 ChannelHandler在和真實的傳輸一起使用時能夠正確地工作。
應用程式的最佳傳輸建議:
應用程式的需求 | 推薦的傳輸 |
---|---|
非阻塞代碼庫或者一個常規的起點 | NIO(或者在 Linux 上使用 epoll) |
阻塞代碼庫 | OIO |
在同一個 JVM 內部的通信 | Local |
測試 ChannelHandler 的實現 | Embedded |