Netty實戰(二)

来源:https://www.cnblogs.com/kimi77/archive/2023/05/24/17427536.html
-Advertisement-
Play Games

# 一、環境準備 Netty需要的運行環境很簡單,只有2個。 - JDK 1.8+ - Apache Maven 3.3.9+ # 二、Netty 客戶端/伺服器概覽 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/c49191e6ee6e448f8c525b450 ...


一、環境準備

Netty需要的運行環境很簡單,只有2個。

  • JDK 1.8+
  • Apache Maven 3.3.9+

二、Netty 客戶端/伺服器概覽

在這裡插入圖片描述
如圖,展示了一個我們將要編寫的 Echo 客戶端和伺服器應用程式。該圖展示是多個客戶端同時連接到一臺伺服器。所能夠支持的客戶端數量,在理論上,僅受限於系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。

Echo 客戶端和伺服器之間的交互是非常簡單的;在客戶端建立一個連接之後,它會向伺服器發送一個或多個消息,反過來伺服器又會將每個消息回送給客戶端。雖然它本身看起來好像用處不大,但它充分地體現了客戶端/伺服器系統中典型的請求-響應交互模式

三、編寫 Echo 伺服器

所有的 Netty 伺服器都需要以下兩部分。

  • 至少一個 ChannelHandler—該組件實現了伺服器對從客戶端接收的數據的處理,即它的業務邏輯。
  • 引導—這是配置伺服器的啟動代碼。至少,它會將伺服器綁定到它要監聽連接請求的埠上。

3.1 ChannelHandler 和業務邏輯

上一篇博文我們介紹了 Future 和回調,並且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個介面族的父介面,它的實現負責接收並響應事件通知。

在 Netty 應用程式中,所有的數據處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 伺服器會響應傳入的消息,所以它需要實現ChannelInboundHandler 介面,用來定義響應入站事件的方法。簡單的應用程式只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的預設實現。

我們將要用到的方法是:

  • channelRead() :對於每個傳入的消息都要調用;
  • channelReadComplete() : 通知ChannelInboundHandler最後一次對channelRead()的調用是當前批量讀取中的最後一條消息;
  • exceptionCaught() :在讀取操作期間,有異常拋出時會調用。

該 Echo 伺服器的 ChannelHandler 實現是 EchoServerHandler,如代碼:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:05
 * @notes Netty Echo服務端簡單邏輯
 */

//表示channel可以並多個實例共用,它是線程安全的
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        //將消息列印到控制台
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        //將收到的消息寫給發送者,而不沖刷出站消息
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //將未決消息沖刷到遠程節點,並且關閉該 Channe
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //列印異常堆棧跟蹤
        cause.printStackTrace();
        //關閉該channel
        ctx.close();
    }
}

ChannelInboundHandlerAdapter 有一個直觀的 API,並且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上。

因為需要處理所有接收到的數據,所以我們重寫了 channelRead() 方法。在這個伺服器應用程式中,我們將數據簡單地回送給了遠程節點。

重寫 exceptionCaught() 方法允許我們對 Throwable 的任何子類型做出反應,在這裡你記錄了異常並關閉了連接。

雖然一個更加完善的應用程式也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連接來通知遠程節點發生了錯誤。

ps:如果不捕獲異常,會發生什麼呢?

每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的實例鏈。在預設的情況下,ChannelHandler 會把對它的方法的調用轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那麼所接收的異常將會被傳遞到 ChannelPipeline 的尾端並被記錄。為此,你的應用程式應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。

除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子類型和實現。這些之後會一一說明,目前,我們只關註:

  • 針對不同類型的事件來調用 ChannelHandler;
  • 應用程式通過實現或者擴展 ChannelHandler 來掛鉤到事件的生命周期,並且提供自定義的應用程式邏輯;
  • 在架構上,ChannelHandler 有助於保持業務邏輯與網路處理代碼的分離。這簡化了開發過程,因為代碼必須不斷地演化以響應不斷變化的需求。

3.2 引導伺服器

下麵我們準備開始構建伺服器。構建伺服器涉及到兩個內容:

  • 綁定到伺服器將在其上監聽並接受傳入連接請求的埠;
  • 配置 Channel,以將有關的入站消息通知給 EchoServerHandler 實例。
package com.example.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:21
 * @notes Netty引導伺服器
 */
public class EchoServer {

    public static void main(String[] args) throws Exception {
        //調用伺服器的 start()方法
        new EchoServer().start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //創建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建ServerBootstra
            ServerBootstrap b = new ServerBootstrap();
            //指定伺服器監視埠
             int port = 8080;
            b.group(group)
                    //指定所使用的 NIO 傳輸 Channel
                    //因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接,
                    // 並且將 Channel 的類型指定為 NioServerSocketChannel 。
                    .channel(NioServerSocketChannel.class)
                    //使用指定的埠設置套接字地址
                    //將本地地址設置為一個具有選定埠的 InetSocketAddress 。伺服器將綁定到這個地址以監聽新的連接請求
                    .localAddress(new InetSocketAddress(port))
                    //添加一個EchoServerHandler 到子Channel的 ChannelPipeline
                    //這裡使用了一個特殊的類——ChannelInitializer。這是關鍵。
                    // 當一個新的連接被接受時,一個新的子 Channel 將會被創建,而 ChannelInitializer 將會把一個你的
                    //EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的,
                    // 這個 ChannelHandler 將會收到有關入站消息的通知。
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //EchoServerHandler 被標註為 @Shareable,所以我們可以總是使用同樣的實例
                            //實際上所有客戶端都是使用的同一個EchoServerHandler
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //非同步地綁定伺服器,調用 sync()方法阻塞等待直到綁定完成
            //sync()方法的調用將導致當前 Thread阻塞,一直到綁定操作完成為止
            ChannelFuture f = b.bind().sync();
            //獲取 Channel 的CloseFuture,並且阻塞當前線
            //該應用程式將會阻塞等待直到伺服器的 Channel關閉(因為你在 Channel 的 CloseFuture 上調用了 sync()方法)
            f.channel().closeFuture().sync();
        } finally {
            //關閉 EventLoopGroup,釋放所有的資源,包括所有被創建的線程
            group.shutdownGracefully().sync();
        }
    }
}

我們總結一下伺服器實現中的重要步驟。下麵這些是伺服器的主要代碼組件:

  • EchoServerHandler 實現了業務邏輯;
  • main()方法引導了伺服器;
    引導過程中所需要的步驟如下:
    • 創建一個 ServerBootstrap 的實例以引導和綁定伺服器;
    • 創建並分配一個 NioEventLoopGroup 實例以進行事件的處理,如接受新連接以及讀/寫數據;
    • 指定伺服器綁定的本地的 InetSocketAddress;
    • 使用一個 EchoServerHandler 的實例初始化每一個新的 Channel;
    • 調用 ServerBootstrap.bind()方法以綁定伺服器。

到此我們的引導伺服器已經完成。

四、編寫 Echo 客戶端

Echo 客戶端將會:
(1)連接到伺服器;
(2)發送一個或者多個消息;
(3)對於每個消息,等待並接收從伺服器發回的相同的消息;
(4)關閉連接。
編寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在伺服器中看到的一樣。

4.1 通過 ChannelHandler 實現客戶端邏輯

如同伺服器,客戶端將擁有一個用來處理數據的 ChannelInboundHandler。在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下麵的方法:

  • channelActive() : 在到伺服器的連接已經建立之後將被調用;
  • channelRead0() : 當從伺服器接收到一條消息時被調用;
  • exceptionCaught() :在處理過程中引發異常時被調用。

具體代碼可以參考如下:

package com.example.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * @author lhd
 * @date 2023/05/16 15:45
 * @notes Netty 簡單的客戶端邏輯
 */

//標記該類的實例可以被多個 Channel 共用
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    //當被通知 Channel是活躍的時候,發送一條消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    //記錄已接收消息的轉儲
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    //在發生異常時,記錄錯誤並關閉Channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

首先,我們重寫了 channelActive() 方法,其將在一個連接建立時被調用。這確保了數據將會被儘可能快地寫入伺服器,其在這個場景下是一個編碼了字元串"Netty rocks!"的位元組緩衝區。

接下來,我們重寫了 channelRead0() 方法。每當接收數據時,都會調用這個方法。由伺服器發送的消息可能會被分塊接收。也就是說,如果伺服器發送了 5 位元組,那麼不能保證這 5 位元組會被一次性接收。即使是對於這麼少量的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有 3 位元組的 ByteBuf(Netty 的位元組容器),第二次使用一個持有 2 位元組的 ByteBuf。作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序被接收。

ps:所以channelRead0()的調用次數不一定等於伺服器發佈消息的次數

重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到伺服器的連接。

ps:為什麼客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?

在客戶端,當 channelRead0()方法完成時,我們已經有了傳入消息,並且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向保存該消息的 ByteBuf 的記憶體引用。

在 EchoServerHandler 中,我們仍然需要將傳入消息回送給發送者,而 write()操作是非同步的,直到 channelRead()方法返回後可能仍然沒有完成。為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被調用時被釋放。

4.2 引導客戶端

引導客戶端類似於引導伺服器,不同的是,客戶端是使用主機和埠參數來連接遠程地址,也就是這裡的 Echo 伺服器的地址,而不是綁定到一個一直被監聽的埠。

package com.example.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @author lhd
 * @date 2023/05/16 15:59
 * @notes 引導客戶端
 */
public class EchoClient {
  
    public void start() throws Exception {
        //指定 EventLoopGroup 以處理客戶端事件;需要適用於 NIO 的實現
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建 Bootstrap
            Bootstrap b = new Bootstrap();
            b.group(group)
                    //適用於 NIO 傳輸的 Channel 類型
                    .channel(NioSocketChannel.class)
                    //設置伺服器的InetSocketAddress
                    .remoteAddress(new InetSocketAddress("127.0.0.1", 8080))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //在創建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實例
                            ch.pipeline().addLast(new EchoClientHandler());}
                    });
            //連接到遠程節點,阻塞等待直到連接完成
            ChannelFuture f = b.connect().sync();
            //阻塞,直到Channel 關閉
            f.channel().closeFuture().sync();
        } finally {
            //關閉線程池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }
 public static void main(String[] args) throws Exception {
        new EchoClient().start();
    }
}

總結一下要點:

  • 為初始化客戶端,創建了一個 Bootstrap 實例;
  • 為進行事件處理分配了一個 NioEventLoopGroup 實例,其中事件處理包括創建新的連接以及處理入站和出站數據;
  • 為伺服器連接創建了一個 InetSocketAddress 實例;
  • 當連接被建立時,一個 EchoClientHandler 實例會被安裝到(該 Channel 的)
    ChannelPipeline 中;
  • 在一切都設置完成後,調用 Bootstrap.connect()方法連接到遠程節點;

綜上客戶端的構建已經完成。

五、構建和運行 Echo 伺服器和客戶端

將我們上面的代碼複製到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:

服務端控制台列印:
在這裡插入圖片描述
客戶端控制台列印:
在這裡插入圖片描述
我們關閉服務端後,客戶端控制台列印:
在這裡插入圖片描述
因為服務端關閉,觸發了客戶端 EchoClientHandler 中的exceptionCaught()方法,列印出了異常堆棧並關閉了連接。

這隻是一個簡單的應用程式,但是它可以伸縮到支持數千個併發連接——每秒可以比普通的基於套接字的 Java 應用程式處理多得多的消息。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安裝方式親測好使的安 ...
  • 我們在學習 Java 基礎時就知道可以生成隨機數,可以為我們枯燥的學習增加那麼一丟丟的樂趣。本文就來介紹 Java 隨機數。 ...
  • ## 教程簡介 Ruby,一種簡單快捷的面向對象(面向對象程式設計)腳本語言,在20世紀90年代由日本人松本行弘(Yukihiro Matsumoto)開發,遵守GPL協議和Ruby License。它的靈感與特性來自於 Perl、Smalltalk、Eiffel、Ada以及 Lisp 語言。由 R ...
  • # 一、Java配置線程池 ## 1、線程池==分類==、其他 ### 1.1、分類 ==IO密集型 和 CPU密集型== 任務的特點不同,因此針對不同類型的任務,選擇不同類型的線程池可以獲得更好的性能表現。 #### 1.1. IO密集型任務 ​ IO密集型任務的特點是需要頻繁讀寫磁碟、網路或者其 ...
  • 一、準備一下 開發環境 Pycharm python 3.8 ffmpeg 模塊的使用 requests re subprocess 二、基本思路流程 1、明確需求 採集下破站視頻數據通過開發者工具進行抓包分析,分析破站視頻數據的來源。 開發者工具的使用 打開方式: 滑鼠右鍵點擊檢查選擇Networ ...
  • ## 前言 如題,這個小玩意,就是不限制你查的是哪張表,用的是什麼類。 我直接一把梭,嘎嘎給你一頓導出。 我知道,這是很多人都想過的, 至少我就收到很多人問過我這個類似的問題。 我也跟他們說了,但是他們就是不動手,其實真的很簡單。 不動手怎麼辦? 我出手唄。 不多說開搞 。 ## 正文 玩法很簡單。 ...
  • # 超輕量級 DynamicTableNameInnerInterceptor是mybatis-plug的一個攔截器插件,可以自己定義需要攔截的表單,然後對它進行加工,這時mybatis-plus就會把SQL代碼的表名加上你的這個裝飾。 # 封裝的思想 我們通常把mybatis做成一個包,公司其它同 ...
  • 用`markdown`寫文檔很方便,但是有個困擾的地方,就是標題的編號問題。 寫文檔的時候,經常會在中間插入新的標題和內容,所以手動管理編號的話,如果新的標題插在前面,則要調整後面所有的編號。 如果在文檔完成後再手動加上編號的話,不僅容易忘記, 而且有時候我們是在其他編輯器里編輯文檔再導出`mark ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...