引言 問題 現在我們使用通用的應用程式或庫來相互通信。例如,我們經常使用HTTP客戶機從web伺服器檢索信息,並通過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像我們不使用通用HTTP伺服器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人游戲數據)一 ...
引言
問題 現在我們使用通用的應用程式或庫來相互通信。例如,我們經常使用HTTP客戶機從web伺服器檢索信息,並通過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像我們不使用通用HTTP伺服器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人游戲數據)一樣。所需要的是一個高度優化的協議實現,專門用於一個特殊目的。例如,您可能希望實現一個針對基於ajax的聊天應用程式、媒體流或大文件傳輸進行優化的HTTP伺服器。您甚至可以設計和實現一個完全根據您的需要量身定製的全新協議。另一個不可避免的情況是,您必須處理遺留的專有協議,以確保與舊系統的互操作性。在這種情況下,重要的是在不犧牲結果應用程式的穩定性和性能的情況下,我們能以多快的速度實現該協議。解決方案 Netty項目旨在提供非同步事件驅動的網路應用程式框架和工具,以快速開發可維護的高性能·高可伸縮性協議伺服器和客戶端。
換句話說,Netty是一個NIO客戶端伺服器框架,它支持協議伺服器和客戶端等網路應用程式的快速輕鬆開發。它極大地簡化和簡化了TCP和UDP套接字伺服器開發等網路編程。
“快速和簡單”並不意味著最終的應用程式將遭受可維護性或性能問題的影響。Netty是根據從許多協議(如FTP、SMTP、HTTP以及各種基於二進位和文本的遺留協議)的實現中學到的經驗精心設計的。因此,Netty成功地找到了一種無需妥協就可以輕鬆實現開發、性能、穩定性和靈活性的方法。 一些用戶可能已經發現了其他聲稱具有相同優勢的網路應用程式框架,您可能想知道是什麼使Netty與他們如此不同。答案是它所建立的哲學。Netty旨在從一開始就為您提供最舒適的API和實現體驗。它不是什麼有形的東西,但你會意識到這種哲學將使你的生活更容易,當你讀這本指南和玩Netty。 準備開始 本章將介紹Netty的核心結構,並通過簡單的示例讓您快速入門。當你在這一章結束的時候,你將能夠立即在Netty上編寫一個客戶端和一個伺服器。
如果您更喜歡學習自頂向下的方法,那麼您可能希望從第2章——體繫結構概述開始,然後回到這裡。 在開始之前 運行本章示例的最低要求只有兩個;Netty和JDK 1.6或更高版本的最新版本。最新版本的Netty可在項目下載頁面獲得。要下載正確版本的JDK,請參考您首選的JDK供應商的網站。 編寫Discard服務 世界上最簡單的協議不是“你好,世界!”,而是
DISCARD
。它是一種協議,在沒有任何響應的情況下丟棄任何接收到的數據。要實現拋棄協議,惟一需要做的就是忽略所有接收到的數據。讓我們直接從處理程式實現開始,它處理Netty生成的I/O事件。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.丟棄接收到的數據
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
註:1、DiscardServerHandler擴展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一個實現。ChannelInboundHandler提供可以覆蓋的各種事件處理程式方法。現在,只需要擴展ChannelInboundHandlerAdapter,而不是自己實現處理程式介面就足夠了。
2、我們在這裡重寫channelRead()事件處理程式方法。當從客戶機接收到新數據時,將使用接收到的消息調用此方法。在本例中,接收到的消息類型是ByteBuf。
3、要實現丟棄協議,處理程式必須忽略接收到的消息。ByteBuf是一個引用計數對象,必須通過release()方法顯式地釋放它。請記住,處理程式有責任釋放傳遞給處理程式的任何引用計數對象。通常,channelRead()處理程式方法是這樣實現的:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
4、異常捕獲()事件處理程式方法在Netty由於I/O錯誤或處理程式實現由於在處理事件時拋出異常而引發異常時被一次性調用。在大多數情況下,應該對捕獲的異常進行日誌記錄,並關閉其關聯的通道,儘管此方法的實現可能因您希望如何處理異常情況而有所不同。例如,您可能希望在關閉連接之前發送帶有錯誤代碼的響應消息。
到目前為止還不錯。我們已經實現了廢棄伺服器的前一半。現在剩下的是編寫main()方法,該方法使用DiscardServerHandler啟動伺服器。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.丟掉所有進來的消息
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 該對象相當於Socket中使用一個線程專門用戶監聽一個socket埠,然後將監聽到的socket對象傳入另一對象
EventLoopGroup workerGroup = new NioEventLoopGroup();// 該對象相當於Socket中對於每個socket連接都都單獨開闢了一個線程進行數據解析出處理
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
註:1、NioEventLoopGroup是一個處理I/O操作的多線程事件迴圈。Netty為不同類型的傳輸提供了各種EventLoopGroup實現。在本例中,我們正在實現一個伺服器端應用程式,因此將使用兩個NioEventLoopGroup。第一個,通常被稱為“老闆”,接受進入的連接。第二個通常稱為“worker”,在boss接受連接並將接受的連接註冊給worker時,它將處理已接受連接的流量。使用多少線程以及如何將它們映射到創建的通道取決於EventLoopGroup實現,甚至可以通過構造函數進行配置。
2、ServerBootstrap是一個設置伺服器的助手類。您可以直接使用Channel
設置伺服器。但是,請註意,這是一個冗長的過程,在大多數情況下不需要這樣做。3、在這裡,我們指定使用NioServerSocketChannel類,該類用於實例化一個新通道以接受傳入連接。
4、這裡指定的處理程式將總是由新接受的通道進行計算。ChannelInitializer是一個用於幫助用戶配置新通道的特殊處理程式。您很可能希望通過添加一些處理程式(如DiscardServerHandler)來實現網路應用程式來配置新通道的ChannelPipeline。隨著應用程式變得複雜,您可能會向管道中添加更多的處理程式,並最終將這個匿名類提取到頂級類中。 5、您還可以設置特定於通道實現的參數。我們正在編寫一個TCP/IP伺服器,所以我們可以設置套接字選項,如tcpNoDelay和keepAlive。請參考ChannelOption的apidocs以及特定的ChannelConfig實現,以獲得關於支持的ChannelOptions的概述。 6、你註意到option()和childOption()嗎?option()用於接受傳入連接的NioServerSocketChannel。childOption()是父ServerChannel接受的通道,在本例中是NioServerSocketChannel 7、我們準備好了。剩下的就是綁定到埠並啟動伺服器。在這裡,我們綁定到機器中所有NICs(網路介面卡)的埠8080。現在,您可以根據需要多次調用bind()方法(使用不同的綁定地址)。
恭喜你!你剛剛在Netty上完成了你的第一個伺服器。 查看接收到的數據 現在我們已經編寫了第一個伺服器,我們需要測試它是否真正工作。測試它的最簡單方法是使用telnet命令。例如,您可以在命令行中輸入telnet localhost 8080並輸入一些內容。但是,我們可以說伺服器工作得很好嗎?我們無法真正知道這一點,因為它是一個丟棄伺服器。你不會得到任何回應。為了證明它確實有效,讓我們修改伺服器來列印它收到的內容。 我們已經知道,每當接收到數據時都會調用channelRead()方法。讓我們將一些代碼放入DiscardServerHandler的channelRead()方法中:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
註: 1、這個低效的迴圈實際上可以簡化為:System.out.println(in.toString(io.net .util. charsetutil.us_ascii))
2、或者,您可以在這裡執行in.release()。
問題編寫ECHO服務
到目前為止,我們一直在使用數據,而沒有做出任何響應。然而,伺服器通常應該響應請求。讓我們學習如何通過實現ECHO協議向客戶端寫入響應消息,其中所有接收到的數據都被髮回。與我們在前幾節中實現的Discards伺服器的唯一區別是,它將接收到的數據發送回伺服器,而不是將接收到的數據列印到控制台。因此,再次修改channelRead()方法就足夠了:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
註: 1、ChannelHandlerContext對象提供各種操作,使您能夠觸發各種I/O事件和操作。在這裡,我們調用write(Object)以逐字記錄接收到的消息。請註意,我們沒有釋放接收到的消息,這與我們在丟棄示例中所做的不同。這是因為當它被寫在網上時,Netty會為你釋放它。
2、ctx.write(Object)不會將消息寫到線路上。它在內部進行緩衝,然後通過ctx.flush()將其衝到電線上。或者,為了簡潔起見,可以調用ctx.writeAndFlush(msg)。
如果您再次運行telnet命令,您將看到伺服器返回您發送給它的任何內容。
echo伺服器的完整源代碼位於發行版的io.net .example.echo包中。
編寫一個時間服務 本節中要實現的協議是TIME協議。與前面的示例不同的是,它發送一個包含32位整數的消息,而不接收任何請求,並且在消息發送後關閉連接。在本例中,您將學習如何構造和發送消息,以及如何在完成時關閉連接。因為我們將忽略任何接收到的數據,但是在連接建立後立即發送消息,所以這次我們不能使用channelRead()方法。相反,我們應該重寫channelActive()方法。下麵是實現:package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
註: 1、如前所述,將在建立連接並準備生成通信量時調用channelActive()方法。讓我們寫一個32位整數,表示這個方法中的當前時間。
2、要發送新消息,我們需要分配一個包含消息的新緩衝區。我們要寫一個32位整數,因此我們需要一個ByteBuf,它的容量至少是4位元組。通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator,並分配一個新的緩衝區。
3、像往常一樣,我們編寫構造好的消息。
但是等等,拋硬幣在哪裡?在使用NIO發送消息之前,我們不是曾經調用java.nio.ByteBuffer.flip()嗎?ByteBuf沒有這樣的方法,因為它有兩個指針;一個用於讀操作,另一個用於寫操作。當您向ByteBuf寫入內容時,寫入器索引會增加,而讀取器索引不會改變。閱讀器索引和寫入器索引分別表示消息開始和結束的位置。
相反,NIO緩衝區沒有提供一種乾凈的方法來確定消息內容在哪裡開始和結束,而不調用flip方法。當您忘記翻轉緩衝區時,您將遇到麻煩,因為不會發送任何或不正確的數據。在Netty中不會發生這樣的錯誤,因為對於不同的操作類型,我們有不同的指針。當你習慣了它,你會發現它讓你的生活變得更容易——一個沒有翻轉的生活!
要註意的另一點是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示尚未發生的I/O操作。這意味著,由於Netty中的所有操作都是非同步的,因此可能還沒有執行任何請求的操作。例如,以下代碼可能會在發送消息之前關閉連接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,您需要在ChannelFuture完成之後調用close()方法,這個方法由write()方法返回,當寫操作完成時,它會通知它的偵聽器。請註意,close()也可能不會立即關閉連接,它將返回ChannelFuture。
4、那麼,當寫請求完成時,我們如何得到通知?這就像在返回的通道未來中添加一個ChannelFutureListener一樣簡單。在這裡,我們創建了一個新的匿名通道futurelistener,它在操作完成時關閉通道。
或者,您可以使用預定義的偵聽器簡化代碼:
f.addListener(ChannelFutureListener.CLOSE);
要測試我們的時間伺服器是否按預期工作,您可以使用UNIX rdate命令:
$ rdate -o <port> -p <host>其中<port>是main()方法中指定的埠號,<host>通常是本地主機。 編寫一個時間客戶端 與Discard伺服器和ECHO伺服器不同,我們需要一個時間協議客戶機,因為人不能將32位二進位數據轉換為日曆上的日期。在本節中,我們將討論如何確保伺服器正確工作,以及如何使用Netty編寫客戶機。
在Netty中,伺服器和客戶機之間最大也是唯一的區別是使用了不同的引導和通道實現。請查看以下代碼:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
註:1、Bootstrap類似於ServerBootstrap,只是它用於非伺服器通道,比如客戶端通道或無連接通道。2、如果只指定一個EventLoopGroup,它將作為boss組和工作者組使用。但是,老闆員工並不用於客戶端 3、與NioServerSocketChannel不同,NioSocketChannel用於創建客戶端通道。 4、註意,這裡我們不像使用ServerBootstrap那樣使用childOption(),因為客戶端SocketChannel沒有父節點。 5、我們應該調用connect()方法,而不是bind()方法。 如您所見,它與伺服器端代碼並沒有什麼不同。那麼ChannelHandler實現呢?它應該從伺服器接收一個32位的整數,將其轉換為人類可讀的格式,列印翻譯後的時間,並關閉連接:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
註:1、在TCP/IP中,Netty將從對等點發送的數據讀入ByteBuf。
它看起來非常簡單,與伺服器端示例沒有任何不同。然而,這個處理程式有時會拒絕啟動IndexOutOfBoundsException。我們將在下一節討論為什麼會發生這種情況。
處理基於流的傳輸
套接字緩衝區的一個小警告
在基於流的傳輸(如TCP/IP)中,接收到的數據存儲在套接字接收緩衝區中。不幸的是,基於流的傳輸的緩衝區不是包的隊列,而是位元組的隊列。這意味著,即使您將兩個消息作為兩個獨立的信息包發送,操作系統也不會將它們視為兩個消息,而只是一堆位元組。因此,不能保證您所閱讀的內容就是您的遠程對等者所寫的內容。例如,假設一個操作系統的
TCP/IP棧接收了三個包:
由於基於流的協議的一般特性,在您的應用程式中很有可能以以下片段形式閱讀它們:
因此,無論接收部分是伺服器端還是客戶端,都應該將接收到的數據碎片整理成應用程式邏輯可以容易理解的一個或多個有意義的幀,應用程式邏輯可以很容易地理解這些幀。對於上面的例子,接收到的數據應該像下麵這樣構造:
第一個解決方案:
現在讓我們回到TIME
客戶端示例。我們在這裡遇到同樣的問題。32位整數是非常少量的數據,並且不太可能經常被分段。然而,問題在於它可能是碎片化的,並且隨著流量的增加,碎片化的可能性將增加。
簡單的解決方案是創建一個內部累積緩衝區,並等待所有4個位元組都被接收到內部緩衝區。以下是TimeClientHandler
修複此問題的修改實現:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
註:1、ChannelHandler有兩個生命周期監聽器方法:handlerAdded()和handlerRemoved()。您可以執行任意(de)初始化任務,只要它不會長時間阻塞。
2、首先,應將所有收到的數據累積到buf。
3、然後,處理程式必須檢查buf是否有足夠的數據,在此示例中為4個位元組,然後繼續執行實際的業務邏輯。否則,channelRead()當更多數據到達時,Netty將再次調用該方法,最終將累計所有4個位元組。
第二種解決方案
雖然第一個解決方案已經解決了TIME
客戶端的問題,但修改後的處理程式看起來並不幹凈。想象一個更複雜的協議,它由多個欄位組成,例如可變長度欄位。您的ChannelInboundHandler
實施將很快變得無法維護。
您可能已經註意到,您可以ChannelHandler
為a 添加多個ChannelPipeline
,因此,您可以將一個單片拆分ChannelHandler
為多個模塊化,以降低應用程式的複雜性。例如,您可以拆分TimeClientHandler
為兩個處理程式:
TimeDecoder
它涉及碎片問題,以及- 最初的簡單版本
TimeClientHandler
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
註:1、ByteToMessageDecoder是一種實現ChannelInboundHandler,可以很容易地處理碎片問題。
2、ByteToMessageDecoderdecode()每當收到新數據時,都會使用內部維護的累積緩衝區調用該方法。
3、decode()可以決定不向累積緩衝區中沒有足夠數據的地方添加任何內容。當接收到更多數據時,ByteToMessageDecoder將再次調用decode()。
4、如果decode()向out添加一個對象,則表示解碼器成功解碼一條消息。ByteToMessageDecoder將丟棄累積緩衝區的讀取部分。請記住,您不需要解碼多個消息。ByteToMessageDecoder將繼續調用decode()方法,直到它沒有向out添加任何內容為止。
現在我們有另一個處理程式插入到ChannelPipeline中,我們應該修改TimeClient中的ChannelInitializer實現:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一個喜歡冒險的人,你可能想試試ReplayingDecoder,這將解碼器變得更加簡單。不過,您需要參考API參考以獲得更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
另外,Netty提供了開箱即用的解碼器,它使您能夠非常容易地實現大多數協議,並幫助您避免最終出現難以維護的單塊處理程式實現。更詳細的例子請參考以下包:
io.netty.example.factorial
基於二進位協議io.netty.example.telnet
基於文本行的協議.
ByteBuf
作為協議消息的主要數據結構。在本節中,我們將改進TIME
協議客戶端和伺服器示例以使用POJO而不是ByteBuf
。
在你的ChannelHandler
中使用POJO的優勢是顯而易見的; 通過分離ByteBuf
從處理程式中提取信息的代碼,您的處理程式變得更易於維護和重用。在TIME
客戶端和伺服器示例中,我們只讀取一個32位整數,這不是ByteBuf
直接使用的主要問題。但是,您會發現在實現真實世界協議時必須進行分離。
首先,讓我們定義一個名為的新類型UnixTime
。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
我們現在可以修改它TimeDecoder
來產生一個UnixTime
而不是一個ByteBuf
。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
使用更新的解碼器,TimeClientHandler
不再使用ByteBuf
:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
更簡單,更優雅,對吧?可以在伺服器端應用相同的技術。我們TimeServerHandler
這次更新第一次:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
現在,唯一缺少的部分是一個編碼器,它的實現ChannelOutboundHandler
將一個UnixTime
轉換為一個ByteBuf
。它比編寫解碼器簡單得多,因為編碼消息時無需處理數據包碎片和彙編。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
註:1、這一行中有很多重要的事情。
首先,我們按原樣傳遞原始文件ChannelPromise,以便當編碼數據實際寫入線路時,Netty將其標記為成功或失敗。
第二,我們沒有調用ctx.flush()。有一個單獨的處理程式方法void flush(ChannelHandlerContext ctx),用於覆蓋flush()操作。
為了進一步簡化,您可以使用MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
最後一個任務是在TimeServerHandler之前將一個TimeEncoder插入到伺服器端ChannelPipeline中,這隻是一個簡單的練習。
關閉你的應用程式
關閉一個Netty應用程式通常與關閉通過shutdowndowns()創建的所有EventLoopGroups一樣簡單。當EventLoopGroup完全終止並且屬於該組的所有通道都已關閉時通知您,它返回一個Future
。
概要
在本章中,我們快速瀏覽了Netty,並演示瞭如何在Netty上編寫完整的網路應用程式。
在接下來的章節中有關於Netty的更多詳細信息。我們還鼓勵您查看io.netty.example
包中的Netty示例。
另請註意,社區始終在等待您的問題和想法,以幫助您並根據您的反饋不斷改進Netty及其文檔。