netty 4.x用戶使用指南

来源:https://www.cnblogs.com/onedayinMay/archive/2020/01/16/12203608.html
-Advertisement-
Play Games

引言 問題 現在我們使用通用的應用程式或庫來相互通信。例如,我們經常使用HTTP客戶機從web伺服器檢索信息,並通過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像我們不使用通用HTTP伺服器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人游戲數據)一 ...


引言

問題   現在我們使用通用的應用程式或庫來相互通信。例如,我們經常使用HTTP客戶機從web伺服器檢索信息,並通過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像我們不使用通用HTTP伺服器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人游戲數據)一樣。所需要的是一個高度優化的協議實現,專門用於一個特殊目的。例如,您可能希望實現一個針對基於ajax的聊天應用程式、媒體流或大文件傳輸進行優化的HTTP伺服器。您甚至可以設計和實現一個完全根據您的需要量身定製的全新協議。另一個不可避免的情況是,您必須處理遺留的專有協議,以確保與舊系統的互操作性。在這種情況下,重要的是在不犧牲結果應用程式的穩定性和性能的情況下,我們能以多快的速度實現該協議。
  解決方案   Netty項目旨在提供非同步事件驅動的網路應用程式框架和工具,以快速開發可維護的高性能·高可伸縮性協議伺服器和客戶端。
   換句話說,Netty是一個NIO客戶端伺服器框架,它支持協議伺服器和客戶端等網路應用程式的快速輕鬆開發。它極大地簡化和簡化了TCP和UDP套接字伺服器開發等網路編程。
  “快速和簡單”並不意味著最終的應用程式將遭受可維護性或性能問題的影響。Netty是根據從許多協議(如FTP、SMTP、HTTP以及各種基於二進位和文本的遺留協議)的實現中學到的經驗精心設計的。因此,Netty成功地找到了一種無需妥協就可以輕鬆實現開發、性能、穩定性和靈活性的方法。   一些用戶可能已經發現了其他聲稱具有相同優勢的網路應用程式框架,您可能想知道是什麼使Netty與他們如此不同。答案是它所建立的哲學。Netty旨在從一開始就為您提供最舒適的API和實現體驗。它不是什麼有形的東西,但你會意識到這種哲學將使你的生活更容易,當你讀這本指南和玩Netty。   準備開始   本章將介紹Netty的核心結構,並通過簡單的示例讓您快速入門。當你在這一章結束的時候,你將能夠立即在Netty上編寫一個客戶端和一個伺服器。
  如果您更喜歡學習自頂向下的方法,那麼您可能希望從第2章——體繫結構概述開始,然後回到這裡。   在開始之前   運行本章示例的最低要求只有兩個;Netty和JDK 1.6或更高版本的最新版本。最新版本的Netty可在項目下載頁面獲得。要下載正確版本的JDK,請參考您首選的JDK供應商的網站。   編寫Discard服務   世界上最簡單的協議不是“你好,世界!”,而是DISCARD。它是一種協議,在沒有任何響應的情況下丟棄任何接收到的數據。
  要實現拋棄協議,惟一需要做的就是忽略所有接收到的數據。讓我們直接從處理程式實現開始,它處理Netty生成的I/O事件。
package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.丟棄接收到的數據
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
註:1、DiscardServerHandler擴展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一個實現。ChannelInboundHandler提供可以覆蓋的各種事件處理程式方法。現在,只需要擴展ChannelInboundHandlerAdapter,而不是自己實現處理程式介面就足夠了。        2、我們在這裡重寫channelRead()事件處理程式方法。當從客戶機接收到新數據時,將使用接收到的消息調用此方法。在本例中,接收到的消息類型是ByteBuf。        3、要實現丟棄協議,處理程式必須忽略接收到的消息。ByteBuf是一個引用計數對象,必須通過release()方法顯式地釋放它。請記住,處理程式有責任釋放傳遞給處理程式的任何引用計數對象。通常,channelRead()處理程式方法是這樣實現的:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  4、異常捕獲()事件處理程式方法在Netty由於I/O錯誤或處理程式實現由於在處理事件時拋出異常而引發異常時被一次性調用。在大多數情況下,應該對捕獲的異常進行日誌記錄,並關閉其關聯的通道,儘管此方法的實現可能因您希望如何處理異常情況而有所不同。例如,您可能希望在關閉連接之前發送帶有錯誤代碼的響應消息。   到目前為止還不錯。我們已經實現了廢棄伺服器的前一半。現在剩下的是編寫main()方法,該方法使用DiscardServerHandler啟動伺服器。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.丟掉所有進來的消息
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 該對象相當於Socket中使用一個線程專門用戶監聽一個socket埠,然後將監聽到的socket對象傳入另一對象
        EventLoopGroup workerGroup = new NioEventLoopGroup();// 該對象相當於Socket中對於每個socket連接都都單獨開闢了一個線程進行數據解析出處理
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}

 註:1、NioEventLoopGroup是一個處理I/O操作的多線程事件迴圈。Netty為不同類型的傳輸提供了各種EventLoopGroup實現。在本例中,我們正在實現一個伺服器端應用程式,因此將使用兩個NioEventLoopGroup。第一個,通常被稱為“老闆”,接受進入的連接。第二個通常稱為“worker”,在boss接受連接並將接受的連接註冊給worker時,它將處理已接受連接的流量。使用多少線程以及如何將它們映射到創建的通道取決於EventLoopGroup實現,甚至可以通過構造函數進行配置。

        2、ServerBootstrap是一個設置伺服器的助手類。您可以直接使用Channel設置伺服器。但是,請註意,這是一個冗長的過程,在大多數情況下不需要這樣做。
        3、在這裡,我們指定使用NioServerSocketChannel類,該類用於實例化一個新通道以接受傳入連接。
        4、這裡指定的處理程式將總是由新接受的通道進行計算。ChannelInitializer是一個用於幫助用戶配置新通道的特殊處理程式。您很可能希望通過添加一些處理程式(如DiscardServerHandler)來實現網路應用程式來配置新通道的ChannelPipeline。隨著應用程式變得複雜,您可能會向管道中添加更多的處理程式,並最終將這個匿名類提取到頂級類中。         5、您還可以設置特定於通道實現的參數。我們正在編寫一個TCP/IP伺服器,所以我們可以設置套接字選項,如tcpNoDelay和keepAlive。請參考ChannelOption的apidocs以及特定的ChannelConfig實現,以獲得關於支持的ChannelOptions的概述。         6、你註意到option()和childOption()嗎?option()用於接受傳入連接的NioServerSocketChannel。childOption()是父ServerChannel接受的通道,在本例中是NioServerSocketChannel         7、我們準備好了。剩下的就是綁定到埠並啟動伺服器。在這裡,我們綁定到機器中所有NICs(網路介面卡)的埠8080。現在,您可以根據需要多次調用bind()方法(使用不同的綁定地址)。
  恭喜你!你剛剛在Netty上完成了你的第一個伺服器。   查看接收到的數據   現在我們已經編寫了第一個伺服器,我們需要測試它是否真正工作。測試它的最簡單方法是使用telnet命令。例如,您可以在命令行中輸入telnet localhost 8080並輸入一些內容。但是,我們可以說伺服器工作得很好嗎?我們無法真正知道這一點,因為它是一個丟棄伺服器。你不會得到任何回應。為了證明它確實有效,讓我們修改伺服器來列印它收到的內容。 我們已經知道,每當接收到數據時都會調用channelRead()方法。讓我們將一些代碼放入DiscardServerHandler的channelRead()方法中:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
註: 1、這個低效的迴圈實際上可以簡化為:System.out.println(in.toString(io.net .util. charsetutil.us_ascii))         2、或者,您可以在這裡執行in.release()。   問題編寫ECHO服務   到目前為止,我們一直在使用數據,而沒有做出任何響應。然而,伺服器通常應該響應請求。讓我們學習如何通過實現ECHO協議向客戶端寫入響應消息,其中所有接收到的數據都被髮回。
  與我們在前幾節中實現的Discards伺服器的唯一區別是,它將接收到的數據發送回伺服器,而不是將接收到的數據列印到控制台。因此,再次修改channelRead()方法就足夠了: 
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}
註: 1、ChannelHandlerContext對象提供各種操作,使您能夠觸發各種I/O事件和操作。在這裡,我們調用write(Object)以逐字記錄接收到的消息。請註意,我們沒有釋放接收到的消息,這與我們在丟棄示例中所做的不同。這是因為當它被寫在網上時,Netty會為你釋放它。         2、ctx.write(Object)不會將消息寫到線路上。它在內部進行緩衝,然後通過ctx.flush()將其衝到電線上。或者,為了簡潔起見,可以調用ctx.writeAndFlush(msg)。

如果您再次運行telnet命令,您將看到伺服器返回您發送給它的任何內容。

echo伺服器的完整源代碼位於發行版的io.net .example.echo包中。

編寫一個時間服務   本節中要實現的協議是TIME協議。與前面的示例不同的是,它發送一個包含32位整數的消息,而不接收任何請求,並且在消息發送後關閉連接。在本例中,您將學習如何構造和發送消息,以及如何在完成時關閉連接。因為我們將忽略任何接收到的數據,但是在連接建立後立即發送消息,所以這次我們不能使用channelRead()方法。相反,我們應該重寫channelActive()方法。下麵是實現:
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
註: 1、如前所述,將在建立連接並準備生成通信量時調用channelActive()方法。讓我們寫一個32位整數,表示這個方法中的當前時間。         2、要發送新消息,我們需要分配一個包含消息的新緩衝區。我們要寫一個32位整數,因此我們需要一個ByteBuf,它的容量至少是4位元組。通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator,並分配一個新的緩衝區。

        3、像往常一樣,我們編寫構造好的消息。

             但是等等,拋硬幣在哪裡?在使用NIO發送消息之前,我們不是曾經調用java.nio.ByteBuffer.flip()嗎?ByteBuf沒有這樣的方法,因為它有兩個指針;一個用於讀操作,另一個用於寫操作。當您向ByteBuf寫入內容時,寫入器索引會增加,而讀取器索引不會改變。閱讀器索引和寫入器索引分別表示消息開始和結束的位置。

              相反,NIO緩衝區沒有提供一種乾凈的方法來確定消息內容在哪裡開始和結束,而不調用flip方法。當您忘記翻轉緩衝區時,您將遇到麻煩,因為不會發送任何或不正確的數據。在Netty中不會發生這樣的錯誤,因為對於不同的操作類型,我們有不同的指針。當你習慣了它,你會發現它讓你的生活變得更容易——一個沒有翻轉的生活!

              要註意的另一點是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示尚未發生的I/O操作。這意味著,由於Netty中的所有操作都是非同步的,因此可能還沒有執行任何請求的操作。例如,以下代碼可能會在發送消息之前關閉連接:

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
  因此,您需要在ChannelFuture完成之後調用close()方法,這個方法由write()方法返回,當寫操作完成時,它會通知它的偵聽器。請註意,close()也可能不會立即關閉連接,它將返回ChannelFuture。         4、那麼,當寫請求完成時,我們如何得到通知?這就像在返回的通道未來中添加一個ChannelFutureListener一樣簡單。在這裡,我們創建了一個新的匿名通道futurelistener,它在操作完成時關閉通道。               或者,您可以使用預定義的偵聽器簡化代碼:
f.addListener(ChannelFutureListener.CLOSE);

  要測試我們的時間伺服器是否按預期工作,您可以使用UNIX rdate命令:

$ rdate -o <port> -p <host>
  其中<port>是main()方法中指定的埠號,<host>通常是本地主機。   編寫一個時間客戶端   與Discard伺服器和ECHO伺服器不同,我們需要一個時間協議客戶機,因為人不能將32位二進位數據轉換為日曆上的日期。在本節中,我們將討論如何確保伺服器正確工作,以及如何使用Netty編寫客戶機。
  在Netty中,伺服器和客戶機之間最大也是唯一的區別是使用了不同的引導和通道實現。請查看以下代碼:
package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
註:1、Bootstrap類似於ServerBootstrap,只是它用於非伺服器通道,比如客戶端通道或無連接通道。
    2、如果只指定一個EventLoopGroup,它將作為boss組和工作者組使用。但是,老闆員工並不用於客戶端     3、與NioServerSocketChannel不同,NioSocketChannel用於創建客戶端通道。     4、註意,這裡我們不像使用ServerBootstrap那樣使用childOption(),因為客戶端SocketChannel沒有父節點。     5、我們應該調用connect()方法,而不是bind()方法。 如您所見,它與伺服器端代碼並沒有什麼不同。那麼ChannelHandler實現呢?它應該從伺服器接收一個32位的整數,將其轉換為人類可讀的格式,列印翻譯後的時間,並關閉連接:
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
註:1、在TCP/IP中,Netty將從對等點發送的數據讀入ByteBuf。   它看起來非常簡單,與伺服器端示例沒有任何不同。然而,這個處理程式有時會拒絕啟動IndexOutOfBoundsException。我們將在下一節討論為什麼會發生這種情況。   處理基於流的傳輸   套接字緩衝區的一個小警告   在基於流的傳輸(如TCP/IP)中,接收到的數據存儲在套接字接收緩衝區中。不幸的是,基於流的傳輸的緩衝區不是包的隊列,而是位元組的隊列。這意味著,即使您將兩個消息作為兩個獨立的信息包發送,操作系統也不會將它們視為兩個消息,而只是一堆位元組。因此,不能保證您所閱讀的內容就是您的遠程對等者所寫的內容。例如,假設一個操作系統的 TCP/IP棧接收了三個包:    由於基於流的協議的一般特性,在您的應用程式中很有可能以以下片段形式閱讀它們:   因此,無論接收部分是伺服器端還是客戶端,都應該將接收到的數據碎片整理成應用程式邏輯可以容易理解的一個或多個有意義的幀,應用程式邏輯可以很容易地理解這些幀。對於上面的例子,接收到的數據應該像下麵這樣構造: 第一個解決方案:   現在讓我們回到TIME客戶端示例。我們在這裡遇到同樣的問題。32位整數是非常少量的數據,並且不太可能經常被分段。然而,問題在於它可能是碎片化的,並且隨著流量的增加,碎片化的可能性將增加。   簡單的解決方案是創建一個內部累積緩衝區,並等待所有4個位元組都被接收到內部緩衝區。以下是TimeClientHandler修複此問題的修改實現:
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
註:1、ChannelHandler有兩個生命周期監聽器方法:handlerAdded()和handlerRemoved()。您可以執行任意(de)初始化任務,只要它不會長時間阻塞。     2、首先,應將所有收到的數據累積到buf。     3、然後,處理程式必須檢查buf是否有足夠的數據,在此示例中為4個位元組,然後繼續執行實際的業務邏輯。否則,channelRead()當更多數據到達時,Netty將再次調用該方法,最終將累計所有4個位元組。   第二種解決方案   雖然第一個解決方案已經解決了TIME客戶端的問題,但修改後的處理程式看起來並不幹凈。想象一個更複雜的協議,它由多個欄位組成,例如可變長度欄位。您的ChannelInboundHandler實施將很快變得無法維護。   您可能已經註意到,您可以ChannelHandler為a 添加多個ChannelPipeline,因此,您可以將一個單片拆分ChannelHandler為多個模塊化,以降低應用程式的複雜性。例如,您可以拆分TimeClientHandler為兩個處理程式:
  • TimeDecoder 它涉及碎片問題,以及
  • 最初的簡單版本TimeClientHandler
  幸運的是,Netty提供了一個可擴展的類,可以幫助您編寫第一個開箱即用的類:
package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
註:1、ByteToMessageDecoder是一種實現ChannelInboundHandler,可以很容易地處理碎片問題。        2、ByteToMessageDecoderdecode()每當收到新數據時,都會使用內部維護的累積緩衝區調用該方法。        3、decode()可以決定不向累積緩衝區中沒有足夠數據的地方添加任何內容。當接收到更多數據時,ByteToMessageDecoder將再次調用decode()。       4、如果decode()向out添加一個對象,則表示解碼器成功解碼一條消息。ByteToMessageDecoder將丟棄累積緩衝區的讀取部分。請記住,您不需要解碼多個消息。ByteToMessageDecoder將繼續調用decode()方法,直到它沒有向out添加任何內容為止。   現在我們有另一個處理程式插入到ChannelPipeline中,我們應該修改TimeClient中的ChannelInitializer實現:
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

  如果你是一個喜歡冒險的人,你可能想試試ReplayingDecoder,這將解碼器變得更加簡單。不過,您需要參考API參考以獲得更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}
  另外,Netty提供了開箱即用的解碼器,它使您能夠非常容易地實現大多數協議,並幫助您避免最終出現難以維護的單塊處理程式實現。更詳細的例子請參考以下包:    用POJO代替ByteBuf   到目前為止,我們所審查的所有示例都使用了ByteBuf作為協議消息的主要數據結構。在本節中,我們將改進TIME協議客戶端和伺服器示例以使用POJO而不是ByteBuf。 在你的ChannelHandler中使用POJO的優勢是顯而易見的; 通過分離ByteBuf從處理程式中提取信息的代碼,您的處理程式變得更易於維護和重用。在TIME客戶端和伺服器示例中,我們只讀取一個32位整數,這不是ByteBuf直接使用的主要問題。但是,您會發現在實現真實世界協議時必須進行分離。   首先,讓我們定義一個名為的新類型UnixTime
package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

  我們現在可以修改它TimeDecoder來產生一個UnixTime而不是一個ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }
    out.add(new UnixTime(in.readUnsignedInt()));
}

  使用更新的解碼器,TimeClientHandler不再使用ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

  更簡單,更優雅,對吧?可以在伺服器端應用相同的技術。我們TimeServerHandler這次更新第一次:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

  現在,唯一缺少的部分是一個編碼器,它的實現ChannelOutboundHandler將一個UnixTime轉換為一個ByteBuf。它比編寫解碼器簡單得多,因為編碼消息時無需處理數據包碎片和彙編。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
註:1、這一行中有很多重要的事情。             首先,我們按原樣傳遞原始文件ChannelPromise,以便當編碼數據實際寫入線路時,Netty將其標記為成功或失敗。             第二,我們沒有調用ctx.flush()。有一個單獨的處理程式方法void flush(ChannelHandlerContext ctx),用於覆蓋flush()操作。   為了進一步簡化,您可以使用MessageToByteEncoder
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}
  最後一個任務是在TimeServerHandler之前將一個TimeEncoder插入到伺服器端ChannelPipeline中,這隻是一個簡單的練習。   關閉你的應用程式   關閉一個Netty應用程式通常與關閉通過shutdowndowns()創建的所有EventLoopGroups一樣簡單。當EventLoopGroup完全終止並且屬於該組的所有通道都已關閉時通知您,它返回一個Future。   概要   在本章中,我們快速瀏覽了Netty,並演示瞭如何在Netty上編寫完整的網路應用程式。   在接下來的章節中有關於Netty的更多詳細信息。我們還鼓勵您查看io.netty.example包中的Netty示例。   另請註意,社區始終在等待您的問題和想法,以幫助您並根據您的反饋不斷改進Netty及其文檔。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ReactJS簡介 + React 起源於 Facebook 的內部項目,因為該公司對市場上所有 JavaScript MVC 框架,都不滿意,就決定自己寫一套,用來架設 Instagram 的網站。做出來以後,發現這套東西很好用, 就在2013年5月開源了 。 + 由於 React 的設計思想極其 ...
  • JS 百度地圖路書 動態路線 <!DOCTYPE html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <meta name="viewport" content="initial-sca ...
  • 定義 在一個分散式系統(指系統中的節點互相連接並共用數據)中,當涉及讀寫操作時,只能保證一致性 (Consistency)、可用性 (Availability)、分區容錯性 (Partition Tolerance)三者中的兩個,另外一個必須被犧牲。 一致性:CAP中的C和ACID 中的C不是一個含 ...
  • 既要低頭趕路,又要抬頭望天,科技是為人服務的,任何技術背後都有更深層次的考量,在本系列的第一篇文章中我們聊了微服務的本質,它是一種可以加速分工、促進合作的新協作機制。知其然,知其所以然,在第二篇文章中我們剖析了微服務為什麼可以加速分工、促進合作,今天我們再接著來聊聊怎樣開啟微服務架構之旅。 ...
  • 二進位 0000 0000 0000 0000 0000 0000 0000 0001 // 2^0 0000 0000 0000 0000 0000 0000 0000 0010 // 2^1 0000 0000 0000 0000 0000 0000 0000 0100 // 2^2 0000 ...
  • 消息中間件 消息的可靠性傳遞 前言 消息中間件的可靠性消息傳遞,是消息中間件領域非常重要的方案落實問題(在這之前的MQ理論,MQ選型是抽象層次更高的問題,這裡不談)。 並且這個問題與日常開發是存在較大的關聯的。可以這麼說,凡是使用了MQ的,機會都要考慮這個問題。當然也有一些原始數據採集,日誌數據收集 ...
  • 一、訪問網路的兩種方法 1.get:利用參數給伺服器傳遞信息;參數為dict,然後parse解碼 2.post:一般向伺服器傳遞參數使用;post是把信息自動加密處理;如果想要使用post信息,需要使用到data參數 3.Content-Type:application/x-www.form-url ...
  • 前面我們在章節“Socket通訊探索(一)”中如何實現一個tcp連接,但是這僅僅是一個最初級的BIO實現,且沒有添加線程池,實際應用中很少採用這種方式,因為不得不考慮當大量的Tcp連接建立的時候,服務端如何安全穩定的運行?為什麼呢? 1、BIO實現方式,是阻塞式的(上一節最後面的實現方式雖然無數據的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...