netty 4.x用戶使用指南

来源:https://www.cnblogs.com/onedayinMay/archive/2020/01/16/12203608.html
-Advertisement-
Play Games

引言 問題 現在我們使用通用的應用程式或庫來相互通信。例如,我們經常使用HTTP客戶機從web伺服器檢索信息,並通過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像我們不使用通用HTTP伺服器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人游戲數據)一 ...


引言

問題   現在我們使用通用的應用程式或庫來相互通信。例如,我們經常使用HTTP客戶機從web伺服器檢索信息,並通過web服務調用遠程過程調用。然而,通用協議或其實現有時不能很好地進行擴展。這就像我們不使用通用HTTP伺服器來交換巨大的文件、電子郵件消息和近乎實時的消息(如財務信息和多人游戲數據)一樣。所需要的是一個高度優化的協議實現,專門用於一個特殊目的。例如,您可能希望實現一個針對基於ajax的聊天應用程式、媒體流或大文件傳輸進行優化的HTTP伺服器。您甚至可以設計和實現一個完全根據您的需要量身定製的全新協議。另一個不可避免的情況是,您必須處理遺留的專有協議,以確保與舊系統的互操作性。在這種情況下,重要的是在不犧牲結果應用程式的穩定性和性能的情況下,我們能以多快的速度實現該協議。
  解決方案   Netty項目旨在提供非同步事件驅動的網路應用程式框架和工具,以快速開發可維護的高性能·高可伸縮性協議伺服器和客戶端。
   換句話說,Netty是一個NIO客戶端伺服器框架,它支持協議伺服器和客戶端等網路應用程式的快速輕鬆開發。它極大地簡化和簡化了TCP和UDP套接字伺服器開發等網路編程。
  “快速和簡單”並不意味著最終的應用程式將遭受可維護性或性能問題的影響。Netty是根據從許多協議(如FTP、SMTP、HTTP以及各種基於二進位和文本的遺留協議)的實現中學到的經驗精心設計的。因此,Netty成功地找到了一種無需妥協就可以輕鬆實現開發、性能、穩定性和靈活性的方法。   一些用戶可能已經發現了其他聲稱具有相同優勢的網路應用程式框架,您可能想知道是什麼使Netty與他們如此不同。答案是它所建立的哲學。Netty旨在從一開始就為您提供最舒適的API和實現體驗。它不是什麼有形的東西,但你會意識到這種哲學將使你的生活更容易,當你讀這本指南和玩Netty。   準備開始   本章將介紹Netty的核心結構,並通過簡單的示例讓您快速入門。當你在這一章結束的時候,你將能夠立即在Netty上編寫一個客戶端和一個伺服器。
  如果您更喜歡學習自頂向下的方法,那麼您可能希望從第2章——體繫結構概述開始,然後回到這裡。   在開始之前   運行本章示例的最低要求只有兩個;Netty和JDK 1.6或更高版本的最新版本。最新版本的Netty可在項目下載頁面獲得。要下載正確版本的JDK,請參考您首選的JDK供應商的網站。   編寫Discard服務   世界上最簡單的協議不是“你好,世界!”,而是DISCARD。它是一種協議,在沒有任何響應的情況下丟棄任何接收到的數據。
  要實現拋棄協議,惟一需要做的就是忽略所有接收到的數據。讓我們直接從處理程式實現開始,它處理Netty生成的I/O事件。
package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.丟棄接收到的數據
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
註:1、DiscardServerHandler擴展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一個實現。ChannelInboundHandler提供可以覆蓋的各種事件處理程式方法。現在,只需要擴展ChannelInboundHandlerAdapter,而不是自己實現處理程式介面就足夠了。        2、我們在這裡重寫channelRead()事件處理程式方法。當從客戶機接收到新數據時,將使用接收到的消息調用此方法。在本例中,接收到的消息類型是ByteBuf。        3、要實現丟棄協議,處理程式必須忽略接收到的消息。ByteBuf是一個引用計數對象,必須通過release()方法顯式地釋放它。請記住,處理程式有責任釋放傳遞給處理程式的任何引用計數對象。通常,channelRead()處理程式方法是這樣實現的:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        // Do something with msg
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
  4、異常捕獲()事件處理程式方法在Netty由於I/O錯誤或處理程式實現由於在處理事件時拋出異常而引發異常時被一次性調用。在大多數情況下,應該對捕獲的異常進行日誌記錄,並關閉其關聯的通道,儘管此方法的實現可能因您希望如何處理異常情況而有所不同。例如,您可能希望在關閉連接之前發送帶有錯誤代碼的響應消息。   到目前為止還不錯。我們已經實現了廢棄伺服器的前一半。現在剩下的是編寫main()方法,該方法使用DiscardServerHandler啟動伺服器。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.丟掉所有進來的消息
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 該對象相當於Socket中使用一個線程專門用戶監聽一個socket埠,然後將監聽到的socket對象傳入另一對象
        EventLoopGroup workerGroup = new NioEventLoopGroup();// 該對象相當於Socket中對於每個socket連接都都單獨開闢了一個線程進行數據解析出處理
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}

 註:1、NioEventLoopGroup是一個處理I/O操作的多線程事件迴圈。Netty為不同類型的傳輸提供了各種EventLoopGroup實現。在本例中,我們正在實現一個伺服器端應用程式,因此將使用兩個NioEventLoopGroup。第一個,通常被稱為“老闆”,接受進入的連接。第二個通常稱為“worker”,在boss接受連接並將接受的連接註冊給worker時,它將處理已接受連接的流量。使用多少線程以及如何將它們映射到創建的通道取決於EventLoopGroup實現,甚至可以通過構造函數進行配置。

        2、ServerBootstrap是一個設置伺服器的助手類。您可以直接使用Channel設置伺服器。但是,請註意,這是一個冗長的過程,在大多數情況下不需要這樣做。
        3、在這裡,我們指定使用NioServerSocketChannel類,該類用於實例化一個新通道以接受傳入連接。
        4、這裡指定的處理程式將總是由新接受的通道進行計算。ChannelInitializer是一個用於幫助用戶配置新通道的特殊處理程式。您很可能希望通過添加一些處理程式(如DiscardServerHandler)來實現網路應用程式來配置新通道的ChannelPipeline。隨著應用程式變得複雜,您可能會向管道中添加更多的處理程式,並最終將這個匿名類提取到頂級類中。         5、您還可以設置特定於通道實現的參數。我們正在編寫一個TCP/IP伺服器,所以我們可以設置套接字選項,如tcpNoDelay和keepAlive。請參考ChannelOption的apidocs以及特定的ChannelConfig實現,以獲得關於支持的ChannelOptions的概述。         6、你註意到option()和childOption()嗎?option()用於接受傳入連接的NioServerSocketChannel。childOption()是父ServerChannel接受的通道,在本例中是NioServerSocketChannel         7、我們準備好了。剩下的就是綁定到埠並啟動伺服器。在這裡,我們綁定到機器中所有NICs(網路介面卡)的埠8080。現在,您可以根據需要多次調用bind()方法(使用不同的綁定地址)。
  恭喜你!你剛剛在Netty上完成了你的第一個伺服器。   查看接收到的數據   現在我們已經編寫了第一個伺服器,我們需要測試它是否真正工作。測試它的最簡單方法是使用telnet命令。例如,您可以在命令行中輸入telnet localhost 8080並輸入一些內容。但是,我們可以說伺服器工作得很好嗎?我們無法真正知道這一點,因為它是一個丟棄伺服器。你不會得到任何回應。為了證明它確實有效,讓我們修改伺服器來列印它收到的內容。 我們已經知道,每當接收到數據時都會調用channelRead()方法。讓我們將一些代碼放入DiscardServerHandler的channelRead()方法中:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
註: 1、這個低效的迴圈實際上可以簡化為:System.out.println(in.toString(io.net .util. charsetutil.us_ascii))         2、或者,您可以在這裡執行in.release()。   問題編寫ECHO服務   到目前為止,我們一直在使用數據,而沒有做出任何響應。然而,伺服器通常應該響應請求。讓我們學習如何通過實現ECHO協議向客戶端寫入響應消息,其中所有接收到的數據都被髮回。
  與我們在前幾節中實現的Discards伺服器的唯一區別是,它將接收到的數據發送回伺服器,而不是將接收到的數據列印到控制台。因此,再次修改channelRead()方法就足夠了: 
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg); // (1)
    ctx.flush(); // (2)
}
註: 1、ChannelHandlerContext對象提供各種操作,使您能夠觸發各種I/O事件和操作。在這裡,我們調用write(Object)以逐字記錄接收到的消息。請註意,我們沒有釋放接收到的消息,這與我們在丟棄示例中所做的不同。這是因為當它被寫在網上時,Netty會為你釋放它。         2、ctx.write(Object)不會將消息寫到線路上。它在內部進行緩衝,然後通過ctx.flush()將其衝到電線上。或者,為了簡潔起見,可以調用ctx.writeAndFlush(msg)。

如果您再次運行telnet命令,您將看到伺服器返回您發送給它的任何內容。

echo伺服器的完整源代碼位於發行版的io.net .example.echo包中。

編寫一個時間服務   本節中要實現的協議是TIME協議。與前面的示例不同的是,它發送一個包含32位整數的消息,而不接收任何請求,並且在消息發送後關閉連接。在本例中,您將學習如何構造和發送消息,以及如何在完成時關閉連接。因為我們將忽略任何接收到的數據,但是在連接建立後立即發送消息,所以這次我們不能使用channelRead()方法。相反,我們應該重寫channelActive()方法。下麵是實現:
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
註: 1、如前所述,將在建立連接並準備生成通信量時調用channelActive()方法。讓我們寫一個32位整數,表示這個方法中的當前時間。         2、要發送新消息,我們需要分配一個包含消息的新緩衝區。我們要寫一個32位整數,因此我們需要一個ByteBuf,它的容量至少是4位元組。通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator,並分配一個新的緩衝區。

        3、像往常一樣,我們編寫構造好的消息。

             但是等等,拋硬幣在哪裡?在使用NIO發送消息之前,我們不是曾經調用java.nio.ByteBuffer.flip()嗎?ByteBuf沒有這樣的方法,因為它有兩個指針;一個用於讀操作,另一個用於寫操作。當您向ByteBuf寫入內容時,寫入器索引會增加,而讀取器索引不會改變。閱讀器索引和寫入器索引分別表示消息開始和結束的位置。

              相反,NIO緩衝區沒有提供一種乾凈的方法來確定消息內容在哪裡開始和結束,而不調用flip方法。當您忘記翻轉緩衝區時,您將遇到麻煩,因為不會發送任何或不正確的數據。在Netty中不會發生這樣的錯誤,因為對於不同的操作類型,我們有不同的指針。當你習慣了它,你會發現它讓你的生活變得更容易——一個沒有翻轉的生活!

              要註意的另一點是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示尚未發生的I/O操作。這意味著,由於Netty中的所有操作都是非同步的,因此可能還沒有執行任何請求的操作。例如,以下代碼可能會在發送消息之前關閉連接:

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
  因此,您需要在ChannelFuture完成之後調用close()方法,這個方法由write()方法返回,當寫操作完成時,它會通知它的偵聽器。請註意,close()也可能不會立即關閉連接,它將返回ChannelFuture。         4、那麼,當寫請求完成時,我們如何得到通知?這就像在返回的通道未來中添加一個ChannelFutureListener一樣簡單。在這裡,我們創建了一個新的匿名通道futurelistener,它在操作完成時關閉通道。               或者,您可以使用預定義的偵聽器簡化代碼:
f.addListener(ChannelFutureListener.CLOSE);

  要測試我們的時間伺服器是否按預期工作,您可以使用UNIX rdate命令:

$ rdate -o <port> -p <host>
  其中<port>是main()方法中指定的埠號,<host>通常是本地主機。   編寫一個時間客戶端   與Discard伺服器和ECHO伺服器不同,我們需要一個時間協議客戶機,因為人不能將32位二進位數據轉換為日曆上的日期。在本節中,我們將討論如何確保伺服器正確工作,以及如何使用Netty編寫客戶機。
  在Netty中,伺服器和客戶機之間最大也是唯一的區別是使用了不同的引導和通道實現。請查看以下代碼:
package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
註:1、Bootstrap類似於ServerBootstrap,只是它用於非伺服器通道,比如客戶端通道或無連接通道。
    2、如果只指定一個EventLoopGroup,它將作為boss組和工作者組使用。但是,老闆員工並不用於客戶端     3、與NioServerSocketChannel不同,NioSocketChannel用於創建客戶端通道。     4、註意,這裡我們不像使用ServerBootstrap那樣使用childOption(),因為客戶端SocketChannel沒有父節點。     5、我們應該調用connect()方法,而不是bind()方法。 如您所見,它與伺服器端代碼並沒有什麼不同。那麼ChannelHandler實現呢?它應該從伺服器接收一個32位的整數,將其轉換為人類可讀的格式,列印翻譯後的時間,並關閉連接:
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
註:1、在TCP/IP中,Netty將從對等點發送的數據讀入ByteBuf。   它看起來非常簡單,與伺服器端示例沒有任何不同。然而,這個處理程式有時會拒絕啟動IndexOutOfBoundsException。我們將在下一節討論為什麼會發生這種情況。   處理基於流的傳輸   套接字緩衝區的一個小警告   在基於流的傳輸(如TCP/IP)中,接收到的數據存儲在套接字接收緩衝區中。不幸的是,基於流的傳輸的緩衝區不是包的隊列,而是位元組的隊列。這意味著,即使您將兩個消息作為兩個獨立的信息包發送,操作系統也不會將它們視為兩個消息,而只是一堆位元組。因此,不能保證您所閱讀的內容就是您的遠程對等者所寫的內容。例如,假設一個操作系統的 TCP/IP棧接收了三個包:    由於基於流的協議的一般特性,在您的應用程式中很有可能以以下片段形式閱讀它們:   因此,無論接收部分是伺服器端還是客戶端,都應該將接收到的數據碎片整理成應用程式邏輯可以容易理解的一個或多個有意義的幀,應用程式邏輯可以很容易地理解這些幀。對於上面的例子,接收到的數據應該像下麵這樣構造: 第一個解決方案:   現在讓我們回到TIME客戶端示例。我們在這裡遇到同樣的問題。32位整數是非常少量的數據,並且不太可能經常被分段。然而,問題在於它可能是碎片化的,並且隨著流量的增加,碎片化的可能性將增加。   簡單的解決方案是創建一個內部累積緩衝區,並等待所有4個位元組都被接收到內部緩衝區。以下是TimeClientHandler修複此問題的修改實現:
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
註:1、ChannelHandler有兩個生命周期監聽器方法:handlerAdded()和handlerRemoved()。您可以執行任意(de)初始化任務,只要它不會長時間阻塞。     2、首先,應將所有收到的數據累積到buf。     3、然後,處理程式必須檢查buf是否有足夠的數據,在此示例中為4個位元組,然後繼續執行實際的業務邏輯。否則,channelRead()當更多數據到達時,Netty將再次調用該方法,最終將累計所有4個位元組。   第二種解決方案   雖然第一個解決方案已經解決了TIME客戶端的問題,但修改後的處理程式看起來並不幹凈。想象一個更複雜的協議,它由多個欄位組成,例如可變長度欄位。您的ChannelInboundHandler實施將很快變得無法維護。   您可能已經註意到,您可以ChannelHandler為a 添加多個ChannelPipeline,因此,您可以將一個單片拆分ChannelHandler為多個模塊化,以降低應用程式的複雜性。例如,您可以拆分TimeClientHandler為兩個處理程式:
  • TimeDecoder 它涉及碎片問題,以及
  • 最初的簡單版本TimeClientHandler
  幸運的是,Netty提供了一個可擴展的類,可以幫助您編寫第一個開箱即用的類:
package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
註:1、ByteToMessageDecoder是一種實現ChannelInboundHandler,可以很容易地處理碎片問題。        2、ByteToMessageDecoderdecode()每當收到新數據時,都會使用內部維護的累積緩衝區調用該方法。        3、decode()可以決定不向累積緩衝區中沒有足夠數據的地方添加任何內容。當接收到更多數據時,ByteToMessageDecoder將再次調用decode()。       4、如果decode()向out添加一個對象,則表示解碼器成功解碼一條消息。ByteToMessageDecoder將丟棄累積緩衝區的讀取部分。請記住,您不需要解碼多個消息。ByteToMessageDecoder將繼續調用decode()方法,直到它沒有向out添加任何內容為止。   現在我們有另一個處理程式插入到ChannelPipeline中,我們應該修改TimeClient中的ChannelInitializer實現:
b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

  如果你是一個喜歡冒險的人,你可能想試試ReplayingDecoder,這將解碼器變得更加簡單。不過,您需要參考API參考以獲得更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}
  另外,Netty提供了開箱即用的解碼器,它使您能夠非常容易地實現大多數協議,並幫助您避免最終出現難以維護的單塊處理程式實現。更詳細的例子請參考以下包:    用POJO代替ByteBuf   到目前為止,我們所審查的所有示例都使用了ByteBuf作為協議消息的主要數據結構。在本節中,我們將改進TIME協議客戶端和伺服器示例以使用POJO而不是ByteBuf。 在你的ChannelHandler中使用POJO的優勢是顯而易見的; 通過分離ByteBuf從處理程式中提取信息的代碼,您的處理程式變得更易於維護和重用。在TIME客戶端和伺服器示例中,我們只讀取一個32位整數,這不是ByteBuf直接使用的主要問題。但是,您會發現在實現真實世界協議時必須進行分離。   首先,讓我們定義一個名為的新類型UnixTime
package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

  我們現在可以修改它TimeDecoder來產生一個UnixTime而不是一個ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }
    out.add(new UnixTime(in.readUnsignedInt()));
}

  使用更新的解碼器,TimeClientHandler不再使用ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

  更簡單,更優雅,對吧?可以在伺服器端應用相同的技術。我們TimeServerHandler這次更新第一次:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

  現在,唯一缺少的部分是一個編碼器,它的實現ChannelOutboundHandler將一個UnixTime轉換為一個ByteBuf。它比編寫解碼器簡單得多,因為編碼消息時無需處理數據包碎片和彙編。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
註:1、這一行中有很多重要的事情。             首先,我們按原樣傳遞原始文件ChannelPromise,以便當編碼數據實際寫入線路時,Netty將其標記為成功或失敗。             第二,我們沒有調用ctx.flush()。有一個單獨的處理程式方法void flush(ChannelHandlerContext ctx),用於覆蓋flush()操作。   為了進一步簡化,您可以使用MessageToByteEncoder
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}
  最後一個任務是在TimeServerHandler之前將一個TimeEncoder插入到伺服器端ChannelPipeline中,這隻是一個簡單的練習。   關閉你的應用程式   關閉一個Netty應用程式通常與關閉通過shutdowndowns()創建的所有EventLoopGroups一樣簡單。當EventLoopGroup完全終止並且屬於該組的所有通道都已關閉時通知您,它返回一個Future。   概要   在本章中,我們快速瀏覽了Netty,並演示瞭如何在Netty上編寫完整的網路應用程式。   在接下來的章節中有關於Netty的更多詳細信息。我們還鼓勵您查看io.netty.example包中的Netty示例。   另請註意,社區始終在等待您的問題和想法,以幫助您並根據您的反饋不斷改進Netty及其文檔。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ReactJS簡介 + React 起源於 Facebook 的內部項目,因為該公司對市場上所有 JavaScript MVC 框架,都不滿意,就決定自己寫一套,用來架設 Instagram 的網站。做出來以後,發現這套東西很好用, 就在2013年5月開源了 。 + 由於 React 的設計思想極其 ...
  • JS 百度地圖路書 動態路線 <!DOCTYPE html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <meta name="viewport" content="initial-sca ...
  • 定義 在一個分散式系統(指系統中的節點互相連接並共用數據)中,當涉及讀寫操作時,只能保證一致性 (Consistency)、可用性 (Availability)、分區容錯性 (Partition Tolerance)三者中的兩個,另外一個必須被犧牲。 一致性:CAP中的C和ACID 中的C不是一個含 ...
  • 既要低頭趕路,又要抬頭望天,科技是為人服務的,任何技術背後都有更深層次的考量,在本系列的第一篇文章中我們聊了微服務的本質,它是一種可以加速分工、促進合作的新協作機制。知其然,知其所以然,在第二篇文章中我們剖析了微服務為什麼可以加速分工、促進合作,今天我們再接著來聊聊怎樣開啟微服務架構之旅。 ...
  • 二進位 0000 0000 0000 0000 0000 0000 0000 0001 // 2^0 0000 0000 0000 0000 0000 0000 0000 0010 // 2^1 0000 0000 0000 0000 0000 0000 0000 0100 // 2^2 0000 ...
  • 消息中間件 消息的可靠性傳遞 前言 消息中間件的可靠性消息傳遞,是消息中間件領域非常重要的方案落實問題(在這之前的MQ理論,MQ選型是抽象層次更高的問題,這裡不談)。 並且這個問題與日常開發是存在較大的關聯的。可以這麼說,凡是使用了MQ的,機會都要考慮這個問題。當然也有一些原始數據採集,日誌數據收集 ...
  • 一、訪問網路的兩種方法 1.get:利用參數給伺服器傳遞信息;參數為dict,然後parse解碼 2.post:一般向伺服器傳遞參數使用;post是把信息自動加密處理;如果想要使用post信息,需要使用到data參數 3.Content-Type:application/x-www.form-url ...
  • 前面我們在章節“Socket通訊探索(一)”中如何實現一個tcp連接,但是這僅僅是一個最初級的BIO實現,且沒有添加線程池,實際應用中很少採用這種方式,因為不得不考慮當大量的Tcp連接建立的時候,服務端如何安全穩定的運行?為什麼呢? 1、BIO實現方式,是阻塞式的(上一節最後面的實現方式雖然無數據的 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...