Netty實戰(二)

来源:https://www.cnblogs.com/kimi77/archive/2023/05/24/17427536.html
-Advertisement-
Play Games

# 一、環境準備 Netty需要的運行環境很簡單,只有2個。 - JDK 1.8+ - Apache Maven 3.3.9+ # 二、Netty 客戶端/伺服器概覽 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/c49191e6ee6e448f8c525b450 ...


一、環境準備

Netty需要的運行環境很簡單,只有2個。

  • JDK 1.8+
  • Apache Maven 3.3.9+

二、Netty 客戶端/伺服器概覽

在這裡插入圖片描述
如圖,展示了一個我們將要編寫的 Echo 客戶端和伺服器應用程式。該圖展示是多個客戶端同時連接到一臺伺服器。所能夠支持的客戶端數量,在理論上,僅受限於系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。

Echo 客戶端和伺服器之間的交互是非常簡單的;在客戶端建立一個連接之後,它會向伺服器發送一個或多個消息,反過來伺服器又會將每個消息回送給客戶端。雖然它本身看起來好像用處不大,但它充分地體現了客戶端/伺服器系統中典型的請求-響應交互模式

三、編寫 Echo 伺服器

所有的 Netty 伺服器都需要以下兩部分。

  • 至少一個 ChannelHandler—該組件實現了伺服器對從客戶端接收的數據的處理,即它的業務邏輯。
  • 引導—這是配置伺服器的啟動代碼。至少,它會將伺服器綁定到它要監聽連接請求的埠上。

3.1 ChannelHandler 和業務邏輯

上一篇博文我們介紹了 Future 和回調,並且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實現負責接收並響應事件通知。

在 Netty 應用程式中,所有的數據處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 伺服器會響應傳入的消息,所以它需要實現ChannelInboundHandler 介面,用來定義響應入站事件的方法。簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的預設實現。

我們將要用到的方法是:

  • channelRead() :對於每個傳入的消息都要調用;
  • channelReadComplete() : 通知ChannelInboundHandler最後一次對channelRead()的調用是當前批量讀取中的最後一條消息;
  • exceptionCaught() :在讀取操作期間,有異常拋出時會調用。

該 Echo 伺服器的 ChannelHandler 實現是 EchoServerHandler,如代碼:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:05
 * @notes Netty Echo服務端簡單邏輯
 */

//表示channel可以並多個實例共用,它是線程安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        //將消息列印到控制台
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        //將收到的消息寫給發送者,而不沖刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //將未決消息沖刷到遠程節點,並且關閉該 Channe
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //列印異常堆棧跟蹤
        cause.printStackTrace();
        //關閉該channel
        ctx.close();
    }
}

ChannelInboundHandlerAdapter 有一個直觀的 API,並且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上。

因為需要處理所有接收到的數據,所以我們重寫了 channelRead() 方法。在這個伺服器應用程式中,我們將數據簡單地回送給了遠程節點。

重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子類型做出反應,在這裡你記錄了異常並關閉了連接。

雖然一個更加完善的應用程式也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連接來通知遠程節點發生了錯誤。

ps:如果不捕獲異常,會發生什麼呢?

每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的實例鏈。在預設的情況下,ChannelHandler 會把對它的方法的調用轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那麼所接收的異常將會被傳遞到 ChannelPipeline 的尾端並被記錄。為此,你的應用程式應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。

除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子類型和實現。這些之後會一一說明,目前,我們只關註:

  • 針對不同類型的事件來調用 ChannelHandler;
  • 應用程式通過實現或者擴展 ChannelHandler 來掛鉤到事件的生命周期,並且提供自定義的應用程式邏輯;
  • 在架構上,ChannelHandler 有助於保持業務邏輯與網路處理代碼的分離。這簡化了開發過程,因為代碼必須不斷地演化以響應不斷變化的需求。

3.2 引導伺服器

下麵我們準備開始構建伺服器。構建伺服器涉及到兩個內容:

  • 綁定到伺服器將在其上監聽並接受傳入連接請求的埠;
  • 配置 Channel,以將有關的入站消息通知給 EchoServerHandler 實例。
package com.example.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:21
 * @notes Netty引導伺服器
 */
public class EchoServer {

    public static void main(String[] args) throws Exception {
        //調用伺服器的 start()方法
        new EchoServer().start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //創建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建ServerBootstra
            ServerBootstrap b = new ServerBootstrap();
            //指定伺服器監視埠
             int port = 8080;
            b.group(group)
                    //指定所使用的 NIO 傳輸 Channel
                    //因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接,
                    // 並且將 Channel 的類型指定為 NioServerSocketChannel 。
                    .channel(NioServerSocketChannel.class)
                    //使用指定的埠設置套接字地址
                    //將本地地址設置為一個具有選定埠的 InetSocketAddress 。伺服器將綁定到這個地址以監聽新的連接請求
                    .localAddress(new InetSocketAddress(port))
                    //添加一個EchoServerHandler 到子Channel的 ChannelPipeline
                    //這裡使用了一個特殊的類——ChannelInitializer。這是關鍵。
                    // 當一個新的連接被接受時,一個新的子 Channel 將會被創建,而 ChannelInitializer 將會把一個你的
                    //EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的,
                    // 這個 ChannelHandler 將會收到有關入站消息的通知。
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //EchoServerHandler 被標註為 @Shareable,所以我們可以總是使用同樣的實例
                            //實際上所有客戶端都是使用的同一個EchoServerHandler
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //非同步地綁定伺服器,調用 sync()方法阻塞等待直到綁定完成
            //sync()方法的調用將導致當前 Thread阻塞,一直到綁定操作完成為止
            ChannelFuture f = b.bind().sync();
            //獲取 Channel 的CloseFuture,並且阻塞當前線
            //該應用程式將會阻塞等待直到伺服器的 Channel關閉(因為你在 Channel 的 CloseFuture 上調用了 sync()方法)
            f.channel().closeFuture().sync();
        } finally {
            //關閉 EventLoopGroup,釋放所有的資源,包括所有被創建的線程
            group.shutdownGracefully().sync();
        }
    }
}

我們總結一下伺服器實現中的重要步驟。下麵這些是伺服器的主要代碼組件:

  • EchoServerHandler 實現了業務邏輯;
  • main()方法引導了伺服器;
    引導過程中所需要的步驟如下:
    • 創建一個 ServerBootstrap 的實例以引導和綁定伺服器;
    • 創建並分配一個 NioEventLoopGroup 實例以進行事件的處理,如接受新連接以及讀/寫數據;
    • 指定伺服器綁定的本地的 InetSocketAddress;
    • 使用一個 EchoServerHandler 的實例初始化每一個新的 Channel;
    • 調用 ServerBootstrap.bind()方法以綁定伺服器。

到此我們的引導伺服器已經完成。

四、編寫 Echo 客戶端

Echo 客戶端將會:
(1)連接到伺服器;
(2)發送一個或者多個消息;
(3)對於每個消息,等待並接收從伺服器發回的相同的消息;
(4)關閉連接。
編寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。

4.1 通過 ChannelHandler 實現客戶端邏輯

如同伺服器,客戶端將擁有一個用來處理數據的 ChannelInboundHandler。在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下麵的方法:

  • channelActive() : 在到伺服器的連接已經建立之後將被調用;
  • channelRead0() : 當從伺服器接收到一條消息時被調用;
  • exceptionCaught() :在處理過程中引發異常時被調用。

具體代碼可以參考如下:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:45
 * @notes Netty 簡單的客戶端邏輯
 */

//標記該類的實例可以被多個 Channel 共用
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    //當被通知 Channel是活躍的時候,發送一條消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    //記錄已接收消息的轉儲
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    //在發生異常時,記錄錯誤並關閉Channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

首先,我們重寫了 channelActive() 方法,其將在一個連接建立時被調用。這確保了數據將會被儘可能快地寫入伺服器,其在這個場景下是一個編碼了字元串"Netty rocks!"的位元組緩衝區。

接下來,我們重寫了 channelRead0() 方法。每當接收數據時,都會調用這個方法。由伺服器發送的消息可能會被分塊接收。也就是說,如果伺服器發送了 5 位元組,那麼不能保證這 5 位元組會被一次性接收。即使是對於這麼少量的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf。作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序被接收。

ps:所以channelRead0()的調用次數不一定等於伺服器發佈消息的次數

重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到伺服器的連接。

ps:為什麼客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?

在客戶端,當 channelRead0()方法完成時,我們已經有了傳入消息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向保存該消息的 ByteBuf 的記憶體引用。

在 EchoServerHandler 中,我們仍然需要將傳入消息回送給發送者,而 write()操作是非同步的,直到 channelRead()方法返回後可能仍然沒有完成。為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被調用時被釋放。

4.2 引導客戶端

引導客戶端類似於引導伺服器,不同的是,客戶端是使用主機和埠參數來連接遠程地址,也就是這裡的 Echo 伺服器的地址,而不是綁定到一個一直被監聽的埠。

package com.example.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:59
 * @notes 引導客戶端
 */
public class EchoClient {
  
    public void start() throws Exception {
        //指定 EventLoopGroup 以處理客戶端事件;需要適用於 NIO 的實現
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建 Bootstrap
            Bootstrap b = new Bootstrap();
            b.group(group)
                    //適用於 NIO 傳輸的 Channel 類型
                    .channel(NioSocketChannel.class)
                    //設置伺服器的InetSocketAddress
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //在創建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實例
                            ch.pipeline().addLast(new EchoClientHandler());}
                    });
            //連接到遠程節點,阻塞等待直到連接完成
            ChannelFuture f = b.connect().sync();
            //阻塞,直到Channel 關閉
            f.channel().closeFuture().sync();
        } finally {
            //關閉線程池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }
 public static void main(String[] args) throws Exception {
        new EchoClient().start();
    }
}

總結一下要點:

  • 為初始化客戶端,創建了一個 Bootstrap 實例;
  • 為進行事件處理分配了一個 NioEventLoopGroup 實例,其中事件處理包括創建新的連接以及處理入站和出站數據;
  • 為伺服器連接創建了一個 InetSocketAddress 實例;
  • 當連接被建立時,一個 EchoClientHandler 實例會被安裝到(該 Channel 的)
    ChannelPipeline 中;
  • 在一切都設置完成後,調用 Bootstrap.connect()方法連接到遠程節點;

綜上客戶端的構建已經完成。

五、構建和運行 Echo 伺服器和客戶端

將我們上面的代碼複製到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:

服務端控制台列印:
在這裡插入圖片描述
客戶端控制台列印:
在這裡插入圖片描述
我們關閉服務端後,客戶端控制台列印:
在這裡插入圖片描述
因為服務端關閉,觸發了客戶端 EchoClientHandler 中的exceptionCaught()方法,列印出了異常堆棧並關閉了連接。

這隻是一個簡單的應用程式,但是它可以伸縮到支持數千個併發連接——每秒可以比普通的基於套接字的 Java 應用程式處理多得多的消息。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安 ...
  • 我們在學習 Java 基礎時就知道可以生成隨機數,可以為我們枯燥的學習增加那麼一丟丟的樂趣。本文就來介紹 Java 隨機數。 ...
  • ## 教程簡介 Ruby,一種簡單快捷的面向對象(面向對象程式設計)腳本語言,在20世紀90年代由日本人松本行弘(Yukihiro Matsumoto)開發,遵守GPL協議和Ruby License。它的靈感與特性來自於 Perl、Smalltalk、Eiffel、Ada以及 Lisp 語言。由 R ...
  • # 一、Java配置線程池 ## 1、線程池==分類==、其他 ### 1.1、分類 ==IO密集型 和 CPU密集型== 任務的特點不同,因此針對不同類型的任務,選擇不同類型的線程池可以獲得更好的性能表現。 #### 1.1. IO密集型任務 ​ IO密集型任務的特點是需要頻繁讀寫磁碟、網路或者其 ...
  • 一、準備一下 開發環境 Pycharm python 3.8 ffmpeg 模塊的使用 requests re subprocess 二、基本思路流程 1、明確需求 採集下破站視頻數據通過開發者工具進行抓包分析,分析破站視頻數據的來源。 開發者工具的使用 打開方式: 滑鼠右鍵點擊檢查選擇Networ ...
  • ## 前言 如題,這個小玩意,就是不限制你查的是哪張表,用的是什麼類。 我直接一把梭,嘎嘎給你一頓導出。 我知道,這是很多人都想過的, 至少我就收到很多人問過我這個類似的問題。 我也跟他們說了,但是他們就是不動手,其實真的很簡單。 不動手怎麼辦? 我出手唄。 不多說開搞 。 ## 正文 玩法很簡單。 ...
  • # 超輕量級 DynamicTableNameInnerInterceptor是mybatis-plug的一個攔截器插件,可以自己定義需要攔截的表單,然後對它進行加工,這時mybatis-plus就會把SQL代碼的表名加上你的這個裝飾。 # 封裝的思想 我們通常把mybatis做成一個包,公司其它同 ...
  • 用`markdown`寫文檔很方便,但是有個困擾的地方,就是標題的編號問題。 寫文檔的時候,經常會在中間插入新的標題和內容,所以手動管理編號的話,如果新的標題插在前面,則要調整後面所有的編號。 如果在文檔完成後再手動加上編號的話,不僅容易忘記, 而且有時候我們是在其他編輯器里編輯文檔再導出`mark ...
一周排行
    -Advertisement-
    Play Games
  • 前言 微服務架構已經成為搭建高效、可擴展系統的關鍵技術之一,然而,現有許多微服務框架往往過於複雜,使得我們普通開發者難以快速上手並體驗到微服務帶了的便利。為瞭解決這一問題,於是作者精心打造了一款最接地氣的 .NET 微服務框架,幫助我們輕鬆構建和管理微服務應用。 本框架不僅支持 Consul 服務註 ...
  • 先看一下效果吧: 如果不會寫動畫或者懶得寫動畫,就直接交給Blend來做吧; 其實Blend操作起來很簡單,有點類似於在操作PS,我們只需要設置關鍵幀,滑鼠點來點去就可以了,Blend會自動幫我們生成我們想要的動畫效果. 第一步:要創建一個空的WPF項目 第二步:右鍵我們的項目,在最下方有一個,在B ...
  • Prism:框架介紹與安裝 什麼是Prism? Prism是一個用於在 WPF、Xamarin Form、Uno 平臺和 WinUI 中構建鬆散耦合、可維護和可測試的 XAML 應用程式框架 Github https://github.com/PrismLibrary/Prism NuGet htt ...
  • 在WPF中,屏幕上的所有內容,都是通過畫筆(Brush)畫上去的。如按鈕的背景色,邊框,文本框的前景和形狀填充。藉助畫筆,可以繪製頁面上的所有UI對象。不同畫筆具有不同類型的輸出( 如:某些畫筆使用純色繪製區域,其他畫筆使用漸變、圖案、圖像或繪圖)。 ...
  • 前言 嗨,大家好!推薦一個基於 .NET 8 的高併發微服務電商系統,涵蓋了商品、訂單、會員、服務、財務等50多種實用功能。 項目不僅使用了 .NET 8 的最新特性,還集成了AutoFac、DotLiquid、HangFire、Nlog、Jwt、LayUIAdmin、SqlSugar、MySQL、 ...
  • 本文主要介紹攝像頭(相機)如何採集數據,用於類似攝像頭本地顯示軟體,以及流媒體數據傳輸場景如傳屏、視訊會議等。 攝像頭採集有多種方案,如AForge.NET、WPFMediaKit、OpenCvSharp、EmguCv、DirectShow.NET、MediaCaptre(UWP),網上一些文章以及 ...
  • 前言 Seal-Report 是一款.NET 開源報表工具,擁有 1.4K Star。它提供了一個完整的框架,使用 C# 編寫,最新的版本採用的是 .NET 8.0 。 它能夠高效地從各種資料庫或 NoSQL 數據源生成日常報表,並支持執行複雜的報表任務。 其簡單易用的安裝過程和直觀的設計界面,我們 ...
  • 背景需求: 系統需要對接到XXX官方的API,但因此官方對接以及管理都十分嚴格。而本人部門的系統中包含諸多子系統,系統間為了穩定,程式間多數固定Token+特殊驗證進行調用,且後期還要提供給其他兄弟部門系統共同調用。 原則上:每套系統都必須單獨接入到官方,但官方的接入複雜,還要官方指定機構認證的證書 ...
  • 本文介紹下電腦設備關機的情況下如何通過網路喚醒設備,之前電源S狀態 電腦Power電源狀態- 唐宋元明清2188 - 博客園 (cnblogs.com) 有介紹過遠程喚醒設備,後面這倆天瞭解多了點所以單獨加個隨筆 設備關機的情況下,使用網路喚醒的前提條件: 1. 被喚醒設備需要支持這WakeOnL ...
  • 前言 大家好,推薦一個.NET 8.0 為核心,結合前端 Vue 框架,實現了前後端完全分離的設計理念。它不僅提供了強大的基礎功能支持,如許可權管理、代碼生成器等,還通過採用主流技術和最佳實踐,顯著降低了開發難度,加快了項目交付速度。 如果你需要一個高效的開發解決方案,本框架能幫助大家輕鬆應對挑戰,實 ...