Netty實戰(二)

来源:https://www.cnblogs.com/kimi77/archive/2023/05/24/17427536.html
-Advertisement-
Play Games

# 一、環境準備 Netty需要的運行環境很簡單,只有2個。 - JDK 1.8+ - Apache Maven 3.3.9+ # 二、Netty 客戶端/伺服器概覽 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/c49191e6ee6e448f8c525b450 ...


一、環境準備

Netty需要的運行環境很簡單,只有2個。

  • JDK 1.8+
  • Apache Maven 3.3.9+

二、Netty 客戶端/伺服器概覽

在這裡插入圖片描述
如圖,展示了一個我們將要編寫的 Echo 客戶端和伺服器應用程式。該圖展示是多個客戶端同時連接到一臺伺服器。所能夠支持的客戶端數量,在理論上,僅受限於系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。

Echo 客戶端和伺服器之間的交互是非常簡單的;在客戶端建立一個連接之後,它會向伺服器發送一個或多個消息,反過來伺服器又會將每個消息回送給客戶端。雖然它本身看起來好像用處不大,但它充分地體現了客戶端/伺服器系統中典型的請求-響應交互模式

三、編寫 Echo 伺服器

所有的 Netty 伺服器都需要以下兩部分。

  • 至少一個 ChannelHandler—該組件實現了伺服器對從客戶端接收的數據的處理,即它的業務邏輯。
  • 引導—這是配置伺服器的啟動代碼。至少,它會將伺服器綁定到它要監聽連接請求的埠上。

3.1 ChannelHandler 和業務邏輯

上一篇博文我們介紹了 Future 和回調,並且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實現負責接收並響應事件通知。

在 Netty 應用程式中,所有的數據處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 伺服器會響應傳入的消息,所以它需要實現ChannelInboundHandler 介面,用來定義響應入站事件的方法。簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的預設實現。

我們將要用到的方法是:

  • channelRead() :對於每個傳入的消息都要調用;
  • channelReadComplete() : 通知ChannelInboundHandler最後一次對channelRead()的調用是當前批量讀取中的最後一條消息;
  • exceptionCaught() :在讀取操作期間,有異常拋出時會調用。

該 Echo 伺服器的 ChannelHandler 實現是 EchoServerHandler,如代碼:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:05
 * @notes Netty Echo服務端簡單邏輯
 */

//表示channel可以並多個實例共用,它是線程安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        //將消息列印到控制台
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        //將收到的消息寫給發送者,而不沖刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //將未決消息沖刷到遠程節點,並且關閉該 Channe
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //列印異常堆棧跟蹤
        cause.printStackTrace();
        //關閉該channel
        ctx.close();
    }
}

ChannelInboundHandlerAdapter 有一個直觀的 API,並且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上。

因為需要處理所有接收到的數據,所以我們重寫了 channelRead() 方法。在這個伺服器應用程式中,我們將數據簡單地回送給了遠程節點。

重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子類型做出反應,在這裡你記錄了異常並關閉了連接。

雖然一個更加完善的應用程式也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連接來通知遠程節點發生了錯誤。

ps:如果不捕獲異常,會發生什麼呢?

每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的實例鏈。在預設的情況下,ChannelHandler 會把對它的方法的調用轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那麼所接收的異常將會被傳遞到 ChannelPipeline 的尾端並被記錄。為此,你的應用程式應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。

除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子類型和實現。這些之後會一一說明,目前,我們只關註:

  • 針對不同類型的事件來調用 ChannelHandler;
  • 應用程式通過實現或者擴展 ChannelHandler 來掛鉤到事件的生命周期,並且提供自定義的應用程式邏輯;
  • 在架構上,ChannelHandler 有助於保持業務邏輯與網路處理代碼的分離。這簡化了開發過程,因為代碼必須不斷地演化以響應不斷變化的需求。

3.2 引導伺服器

下麵我們準備開始構建伺服器。構建伺服器涉及到兩個內容:

  • 綁定到伺服器將在其上監聽並接受傳入連接請求的埠;
  • 配置 Channel,以將有關的入站消息通知給 EchoServerHandler 實例。
package com.example.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:21
 * @notes Netty引導伺服器
 */
public class EchoServer {

    public static void main(String[] args) throws Exception {
        //調用伺服器的 start()方法
        new EchoServer().start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //創建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建ServerBootstra
            ServerBootstrap b = new ServerBootstrap();
            //指定伺服器監視埠
             int port = 8080;
            b.group(group)
                    //指定所使用的 NIO 傳輸 Channel
                    //因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接,
                    // 並且將 Channel 的類型指定為 NioServerSocketChannel 。
                    .channel(NioServerSocketChannel.class)
                    //使用指定的埠設置套接字地址
                    //將本地地址設置為一個具有選定埠的 InetSocketAddress 。伺服器將綁定到這個地址以監聽新的連接請求
                    .localAddress(new InetSocketAddress(port))
                    //添加一個EchoServerHandler 到子Channel的 ChannelPipeline
                    //這裡使用了一個特殊的類——ChannelInitializer。這是關鍵。
                    // 當一個新的連接被接受時,一個新的子 Channel 將會被創建,而 ChannelInitializer 將會把一個你的
                    //EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的,
                    // 這個 ChannelHandler 將會收到有關入站消息的通知。
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //EchoServerHandler 被標註為 @Shareable,所以我們可以總是使用同樣的實例
                            //實際上所有客戶端都是使用的同一個EchoServerHandler
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //非同步地綁定伺服器,調用 sync()方法阻塞等待直到綁定完成
            //sync()方法的調用將導致當前 Thread阻塞,一直到綁定操作完成為止
            ChannelFuture f = b.bind().sync();
            //獲取 Channel 的CloseFuture,並且阻塞當前線
            //該應用程式將會阻塞等待直到伺服器的 Channel關閉(因為你在 Channel 的 CloseFuture 上調用了 sync()方法)
            f.channel().closeFuture().sync();
        } finally {
            //關閉 EventLoopGroup,釋放所有的資源,包括所有被創建的線程
            group.shutdownGracefully().sync();
        }
    }
}

我們總結一下伺服器實現中的重要步驟。下麵這些是伺服器的主要代碼組件:

  • EchoServerHandler 實現了業務邏輯;
  • main()方法引導了伺服器;
    引導過程中所需要的步驟如下:
    • 創建一個 ServerBootstrap 的實例以引導和綁定伺服器;
    • 創建並分配一個 NioEventLoopGroup 實例以進行事件的處理,如接受新連接以及讀/寫數據;
    • 指定伺服器綁定的本地的 InetSocketAddress;
    • 使用一個 EchoServerHandler 的實例初始化每一個新的 Channel;
    • 調用 ServerBootstrap.bind()方法以綁定伺服器。

到此我們的引導伺服器已經完成。

四、編寫 Echo 客戶端

Echo 客戶端將會:
(1)連接到伺服器;
(2)發送一個或者多個消息;
(3)對於每個消息,等待並接收從伺服器發回的相同的消息;
(4)關閉連接。
編寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。

4.1 通過 ChannelHandler 實現客戶端邏輯

如同伺服器,客戶端將擁有一個用來處理數據的 ChannelInboundHandler。在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下麵的方法:

  • channelActive() : 在到伺服器的連接已經建立之後將被調用;
  • channelRead0() : 當從伺服器接收到一條消息時被調用;
  • exceptionCaught() :在處理過程中引發異常時被調用。

具體代碼可以參考如下:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:45
 * @notes Netty 簡單的客戶端邏輯
 */

//標記該類的實例可以被多個 Channel 共用
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    //當被通知 Channel是活躍的時候,發送一條消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    //記錄已接收消息的轉儲
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    //在發生異常時,記錄錯誤並關閉Channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

首先,我們重寫了 channelActive() 方法,其將在一個連接建立時被調用。這確保了數據將會被儘可能快地寫入伺服器,其在這個場景下是一個編碼了字元串"Netty rocks!"的位元組緩衝區。

接下來,我們重寫了 channelRead0() 方法。每當接收數據時,都會調用這個方法。由伺服器發送的消息可能會被分塊接收。也就是說,如果伺服器發送了 5 位元組,那麼不能保證這 5 位元組會被一次性接收。即使是對於這麼少量的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf。作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序被接收。

ps:所以channelRead0()的調用次數不一定等於伺服器發佈消息的次數

重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到伺服器的連接。

ps:為什麼客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?

在客戶端,當 channelRead0()方法完成時,我們已經有了傳入消息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向保存該消息的 ByteBuf 的記憶體引用。

在 EchoServerHandler 中,我們仍然需要將傳入消息回送給發送者,而 write()操作是非同步的,直到 channelRead()方法返回後可能仍然沒有完成。為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被調用時被釋放。

4.2 引導客戶端

引導客戶端類似於引導伺服器,不同的是,客戶端是使用主機和埠參數來連接遠程地址,也就是這裡的 Echo 伺服器的地址,而不是綁定到一個一直被監聽的埠。

package com.example.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:59
 * @notes 引導客戶端
 */
public class EchoClient {
  
    public void start() throws Exception {
        //指定 EventLoopGroup 以處理客戶端事件;需要適用於 NIO 的實現
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建 Bootstrap
            Bootstrap b = new Bootstrap();
            b.group(group)
                    //適用於 NIO 傳輸的 Channel 類型
                    .channel(NioSocketChannel.class)
                    //設置伺服器的InetSocketAddress
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //在創建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實例
                            ch.pipeline().addLast(new EchoClientHandler());}
                    });
            //連接到遠程節點,阻塞等待直到連接完成
            ChannelFuture f = b.connect().sync();
            //阻塞,直到Channel 關閉
            f.channel().closeFuture().sync();
        } finally {
            //關閉線程池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }
 public static void main(String[] args) throws Exception {
        new EchoClient().start();
    }
}

總結一下要點:

  • 為初始化客戶端,創建了一個 Bootstrap 實例;
  • 為進行事件處理分配了一個 NioEventLoopGroup 實例,其中事件處理包括創建新的連接以及處理入站和出站數據;
  • 為伺服器連接創建了一個 InetSocketAddress 實例;
  • 當連接被建立時,一個 EchoClientHandler 實例會被安裝到(該 Channel 的)
    ChannelPipeline 中;
  • 在一切都設置完成後,調用 Bootstrap.connect()方法連接到遠程節點;

綜上客戶端的構建已經完成。

五、構建和運行 Echo 伺服器和客戶端

將我們上面的代碼複製到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:

服務端控制台列印:
在這裡插入圖片描述
客戶端控制台列印:
在這裡插入圖片描述
我們關閉服務端後,客戶端控制台列印:
在這裡插入圖片描述
因為服務端關閉,觸發了客戶端 EchoClientHandler 中的exceptionCaught()方法,列印出了異常堆棧並關閉了連接。

這隻是一個簡單的應用程式,但是它可以伸縮到支持數千個併發連接——每秒可以比普通的基於套接字的 Java 應用程式處理多得多的消息。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安 ...
  • 我們在學習 Java 基礎時就知道可以生成隨機數,可以為我們枯燥的學習增加那麼一丟丟的樂趣。本文就來介紹 Java 隨機數。 ...
  • ## 教程簡介 Ruby,一種簡單快捷的面向對象(面向對象程式設計)腳本語言,在20世紀90年代由日本人松本行弘(Yukihiro Matsumoto)開發,遵守GPL協議和Ruby License。它的靈感與特性來自於 Perl、Smalltalk、Eiffel、Ada以及 Lisp 語言。由 R ...
  • # 一、Java配置線程池 ## 1、線程池==分類==、其他 ### 1.1、分類 ==IO密集型 和 CPU密集型== 任務的特點不同,因此針對不同類型的任務,選擇不同類型的線程池可以獲得更好的性能表現。 #### 1.1. IO密集型任務 ​ IO密集型任務的特點是需要頻繁讀寫磁碟、網路或者其 ...
  • 一、準備一下 開發環境 Pycharm python 3.8 ffmpeg 模塊的使用 requests re subprocess 二、基本思路流程 1、明確需求 採集下破站視頻數據通過開發者工具進行抓包分析,分析破站視頻數據的來源。 開發者工具的使用 打開方式: 滑鼠右鍵點擊檢查選擇Networ ...
  • ## 前言 如題,這個小玩意,就是不限制你查的是哪張表,用的是什麼類。 我直接一把梭,嘎嘎給你一頓導出。 我知道,這是很多人都想過的, 至少我就收到很多人問過我這個類似的問題。 我也跟他們說了,但是他們就是不動手,其實真的很簡單。 不動手怎麼辦? 我出手唄。 不多說開搞 。 ## 正文 玩法很簡單。 ...
  • # 超輕量級 DynamicTableNameInnerInterceptor是mybatis-plug的一個攔截器插件,可以自己定義需要攔截的表單,然後對它進行加工,這時mybatis-plus就會把SQL代碼的表名加上你的這個裝飾。 # 封裝的思想 我們通常把mybatis做成一個包,公司其它同 ...
  • 用`markdown`寫文檔很方便,但是有個困擾的地方,就是標題的編號問題。 寫文檔的時候,經常會在中間插入新的標題和內容,所以手動管理編號的話,如果新的標題插在前面,則要調整後面所有的編號。 如果在文檔完成後再手動加上編號的話,不僅容易忘記, 而且有時候我們是在其他編輯器里編輯文檔再導出`mark ...
一周排行
    -Advertisement-
    Play Games
  • 在本篇教程中,我們學習瞭如何使用 Taurus.MVC WebMVC 框架創建一個簡單的頁面。 我們創建了一個控制器並編寫了一個用於呈現頁面的方法,然後創建了對應的視圖,並最終成功運行了應用程式。 在下一篇教程中,我們將繼續探索 Taurus.MVC WebMVC 框架的更多功能和用法。 ...
  • 一:背景 1. 講故事 很多.NET開發者在學習高級調試的時候,使用sos的命令輸出會發現這裡也看不懂那裡也看不懂,比如截圖中的這位朋友。 .NET高級調試屬於一個偏冷門的領域,國內可觀測的資料比較少,所以很多東西需要你自己去探究源代碼,然後用各種調試工具去驗證,相關源代碼如下: coreclr: ...
  • 我一直都以為c中除以2的n次方可以使用右移n位代替,然而在實際調試中發現並不都是這樣的。是在計算餘數是發現了異常 被除數:114325068 右移15計算結果:3488 除法取整計算結果:3489 右移操作計算餘數:33772 除法取整計算餘數:1005 顯然:這是不一樣的。 移位操作是一條cpu指 ...
  • 在上一篇文章中,我們介紹了ReentrantLock類的一些基本用法,今天我們重點來介紹一下ReentrantLock其它的常用方法,以便對ReentrantLock類的使用有更深入的理解。 ...
  • Excelize 是 Go 語言編寫的用於操作電子錶格辦公文檔的開源基礎庫,2024年2月26日,社區正式發佈了 2.8.1 版本,該版本包含了多項新增功能、錯誤修複和相容性提升優化。 ...
  • 雲採用框架(Cloud Adoption Framework,簡稱CAF)為企業上雲提供策略和技術的指導原則和最佳實踐,幫助企業上好雲、用好雲、管好雲,併成功實現業務目標。本雲採用框架是基於服務大量企業客戶的經驗總結,將企業雲採用分為四個階段,並詳細探討企業應在每個階段採取的業務和技術策略;同時,還 ...
  • 與TXT文本文件,PDF文件更加專業也更適合傳輸,常用於正式報告、簡歷、合同等場合。項目中如果有使用Java將TXT文本文件轉為PDF文件的需求,可以查看本文中介紹的免費實現方法。 免費Java PDF庫 本文介紹的方法需要用到Free Spire.PDF for Java,該免費庫支持多種操作、轉 ...
  • 指針和引用 當我們需要在程式中傳遞變數的地址時,可以使用指針或引用。它們都可以用來間接訪問變數,但它們之間有一些重要的區別。 指針是一個變數,它存儲另一個變數的地址。通過指針,我們可以訪問存儲在該地址中的變數。指針可以被重新分配,可以指向不同的變數,也可以為NULL。指針使用*運算符來訪問存儲在地址 ...
  • 即使再小再簡單的需求,作為研發開發完畢之後,我們可以直接上線麽?其實很多時候事故往往就是由於“不以為意”發生的。事故的發生往往也遵循“墨菲定律”,這就要求我們更要敬畏線上,再小的需求點都需要經過嚴格的測試驗證才能上線。 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 一、是什麼 許可權是對特定資源的訪問許可,所謂許可權控制,也就是確保用戶只能訪問到被分配的資源 而前端許可權歸根結底是請求的發起權,請求的發起可能有下麵兩種形式觸發 頁面載入觸發 頁面上的按鈕點擊觸發 總的來說,所有的請求發起都觸發自前端路由或 ...