# 一、環境準備 Netty需要的運行環境很簡單,只有2個。 - JDK 1.8+ - Apache Maven 3.3.9+ # 二、Netty 客戶端/伺服器概覽 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/c49191e6ee6e448f8c525b450 ...
一、環境準備
Netty需要的運行環境很簡單,只有2個。
- JDK 1.8+
- Apache Maven 3.3.9+
二、Netty 客戶端/伺服器概覽
如圖,展示了一個我們將要編寫的 Echo 客戶端和伺服器應用程式。該圖展示是多個客戶端同時連接到一臺伺服器。所能夠支持的客戶端數量,在理論上,僅受限於系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。
Echo 客戶端和伺服器之間的交互是非常簡單的;在客戶端建立一個連接之後,它會向伺服器發送一個或多個消息,反過來伺服器又會將每個消息回送給客戶端。雖然它本身看起來好像用處不大,但它充分地體現了客戶端/伺服器系統中典型的請求-響應交互模式。
三、編寫 Echo 伺服器
所有的 Netty 伺服器都需要以下兩部分。
- 至少一個 ChannelHandler—該組件實現了伺服器對從客戶端接收的數據的處理,即它的業務邏輯。
- 引導—這是配置伺服器的啟動代碼。至少,它會將伺服器綁定到它要監聽連接請求的埠上。
3.1 ChannelHandler 和業務邏輯
上一篇博文我們介紹了 Future 和回調,並且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實現負責接收並響應事件通知。
在 Netty 應用程式中,所有的數據處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 伺服器會響應傳入的消息,所以它需要實現ChannelInboundHandler 介面,用來定義響應入站事件的方法。簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的預設實現。
我們將要用到的方法是:
- channelRead() :對於每個傳入的消息都要調用;
- channelReadComplete() : 通知ChannelInboundHandler最後一次對channelRead()的調用是當前批量讀取中的最後一條消息;
- exceptionCaught() :在讀取操作期間,有異常拋出時會調用。
該 Echo 伺服器的 ChannelHandler 實現是 EchoServerHandler,如代碼:
package com.example.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author lhd
* @date 2023/05/16 15:05
* @notes Netty Echo服務端簡單邏輯
*/
//表示channel可以並多個實例共用,它是線程安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
//將消息列印到控制台
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
//將收到的消息寫給發送者,而不沖刷出站消息
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//將未決消息沖刷到遠程節點,並且關閉該 Channe
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//列印異常堆棧跟蹤
cause.printStackTrace();
//關閉該channel
ctx.close();
}
}
ChannelInboundHandlerAdapter 有一個直觀的 API,並且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上。
因為需要處理所有接收到的數據,所以我們重寫了 channelRead() 方法。在這個伺服器應用程式中,我們將數據簡單地回送給了遠程節點。
重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子類型做出反應,在這裡你記錄了異常並關閉了連接。
雖然一個更加完善的應用程式也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連接來通知遠程節點發生了錯誤。
ps:如果不捕獲異常,會發生什麼呢?
每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的實例鏈。在預設的情況下,ChannelHandler 會把對它的方法的調用轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那麼所接收的異常將會被傳遞到 ChannelPipeline 的尾端並被記錄。為此,你的應用程式應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。
除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子類型和實現。這些之後會一一說明,目前,我們只關註:
- 針對不同類型的事件來調用 ChannelHandler;
- 應用程式通過實現或者擴展 ChannelHandler 來掛鉤到事件的生命周期,並且提供自定義的應用程式邏輯;
- 在架構上,ChannelHandler 有助於保持業務邏輯與網路處理代碼的分離。這簡化了開發過程,因為代碼必須不斷地演化以響應不斷變化的需求。
3.2 引導伺服器
下麵我們準備開始構建伺服器。構建伺服器涉及到兩個內容:
- 綁定到伺服器將在其上監聽並接受傳入連接請求的埠;
- 配置 Channel,以將有關的入站消息通知給 EchoServerHandler 實例。
package com.example.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/16 15:21
* @notes Netty引導伺服器
*/
public class EchoServer {
public static void main(String[] args) throws Exception {
//調用伺服器的 start()方法
new EchoServer().start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
//創建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//創建ServerBootstra
ServerBootstrap b = new ServerBootstrap();
//指定伺服器監視埠
int port = 8080;
b.group(group)
//指定所使用的 NIO 傳輸 Channel
//因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接,
// 並且將 Channel 的類型指定為 NioServerSocketChannel 。
.channel(NioServerSocketChannel.class)
//使用指定的埠設置套接字地址
//將本地地址設置為一個具有選定埠的 InetSocketAddress 。伺服器將綁定到這個地址以監聽新的連接請求
.localAddress(new InetSocketAddress(port))
//添加一個EchoServerHandler 到子Channel的 ChannelPipeline
//這裡使用了一個特殊的類——ChannelInitializer。這是關鍵。
// 當一個新的連接被接受時,一個新的子 Channel 將會被創建,而 ChannelInitializer 將會把一個你的
//EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的,
// 這個 ChannelHandler 將會收到有關入站消息的通知。
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
//EchoServerHandler 被標註為 @Shareable,所以我們可以總是使用同樣的實例
//實際上所有客戶端都是使用的同一個EchoServerHandler
ch.pipeline().addLast(serverHandler);
}
});
//非同步地綁定伺服器,調用 sync()方法阻塞等待直到綁定完成
//sync()方法的調用將導致當前 Thread阻塞,一直到綁定操作完成為止
ChannelFuture f = b.bind().sync();
//獲取 Channel 的CloseFuture,並且阻塞當前線
//該應用程式將會阻塞等待直到伺服器的 Channel關閉(因為你在 Channel 的 CloseFuture 上調用了 sync()方法)
f.channel().closeFuture().sync();
} finally {
//關閉 EventLoopGroup,釋放所有的資源,包括所有被創建的線程
group.shutdownGracefully().sync();
}
}
}
我們總結一下伺服器實現中的重要步驟。下麵這些是伺服器的主要代碼組件:
- EchoServerHandler 實現了業務邏輯;
- main()方法引導了伺服器;
引導過程中所需要的步驟如下:- 創建一個 ServerBootstrap 的實例以引導和綁定伺服器;
- 創建並分配一個 NioEventLoopGroup 實例以進行事件的處理,如接受新連接以及讀/寫數據;
- 指定伺服器綁定的本地的 InetSocketAddress;
- 使用一個 EchoServerHandler 的實例初始化每一個新的 Channel;
- 調用 ServerBootstrap.bind()方法以綁定伺服器。
到此我們的引導伺服器已經完成。
四、編寫 Echo 客戶端
Echo 客戶端將會:
(1)連接到伺服器;
(2)發送一個或者多個消息;
(3)對於每個消息,等待並接收從伺服器發回的相同的消息;
(4)關閉連接。
編寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。
4.1 通過 ChannelHandler 實現客戶端邏輯
如同伺服器,客戶端將擁有一個用來處理數據的 ChannelInboundHandler。在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下麵的方法:
- channelActive() : 在到伺服器的連接已經建立之後將被調用;
- channelRead0() : 當從伺服器接收到一條消息時被調用;
- exceptionCaught() :在處理過程中引發異常時被調用。
具體代碼可以參考如下:
package com.example.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author lhd
* @date 2023/05/16 15:45
* @notes Netty 簡單的客戶端邏輯
*/
//標記該類的實例可以被多個 Channel 共用
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
//當被通知 Channel是活躍的時候,發送一條消息
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
//記錄已接收消息的轉儲
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
//在發生異常時,記錄錯誤並關閉Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
首先,我們重寫了 channelActive() 方法,其將在一個連接建立時被調用。這確保了數據將會被儘可能快地寫入伺服器,其在這個場景下是一個編碼了字元串"Netty rocks!"的位元組緩衝區。
接下來,我們重寫了 channelRead0() 方法。每當接收數據時,都會調用這個方法。由伺服器發送的消息可能會被分塊接收。也就是說,如果伺服器發送了 5 位元組,那麼不能保證這 5 位元組會被一次性接收。即使是對於這麼少量的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf。作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序被接收。
ps:所以channelRead0()的調用次數不一定等於伺服器發佈消息的次數
重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到伺服器的連接。
ps:為什麼客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?
在客戶端,當 channelRead0()方法完成時,我們已經有了傳入消息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向保存該消息的 ByteBuf 的記憶體引用。
在 EchoServerHandler 中,我們仍然需要將傳入消息回送給發送者,而 write()操作是非同步的,直到 channelRead()方法返回後可能仍然沒有完成。為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被調用時被釋放。
4.2 引導客戶端
引導客戶端類似於引導伺服器,不同的是,客戶端是使用主機和埠參數來連接遠程地址,也就是這裡的 Echo 伺服器的地址,而不是綁定到一個一直被監聽的埠。
package com.example.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @author lhd
* @date 2023/05/16 15:59
* @notes 引導客戶端
*/
public class EchoClient {
public void start() throws Exception {
//指定 EventLoopGroup 以處理客戶端事件;需要適用於 NIO 的實現
EventLoopGroup group = new NioEventLoopGroup();
try {
//創建 Bootstrap
Bootstrap b = new Bootstrap();
b.group(group)
//適用於 NIO 傳輸的 Channel 類型
.channel(NioSocketChannel.class)
//設置伺服器的InetSocketAddress
.remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//在創建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實例
ch.pipeline().addLast(new EchoClientHandler());}
});
//連接到遠程節點,阻塞等待直到連接完成
ChannelFuture f = b.connect().sync();
//阻塞,直到Channel 關閉
f.channel().closeFuture().sync();
} finally {
//關閉線程池並且釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient().start();
}
}
總結一下要點:
- 為初始化客戶端,創建了一個 Bootstrap 實例;
- 為進行事件處理分配了一個 NioEventLoopGroup 實例,其中事件處理包括創建新的連接以及處理入站和出站數據;
- 為伺服器連接創建了一個 InetSocketAddress 實例;
- 當連接被建立時,一個 EchoClientHandler 實例會被安裝到(該 Channel 的)
ChannelPipeline 中; - 在一切都設置完成後,調用 Bootstrap.connect()方法連接到遠程節點;
綜上客戶端的構建已經完成。
五、構建和運行 Echo 伺服器和客戶端
將我們上面的代碼複製到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:
服務端控制台列印:
客戶端控制台列印:
我們關閉服務端後,客戶端控制台列印:
因為服務端關閉,觸發了客戶端 EchoClientHandler 中的exceptionCaught()方法,列印出了異常堆棧並關閉了連接。
這隻是一個簡單的應用程式,但是它可以伸縮到支持數千個併發連接——每秒可以比普通的基於套接字的 Java 應用程式處理多得多的消息。