使用Netty框架完成客戶端和服務端收發Protobuf消息

来源:https://www.cnblogs.com/dk168/archive/2023/03/26/17259124.html
-Advertisement-
Play Games

電腦組成原理 哈工大 劉巨集偉 b站課程地址:https://www.bilibili.com/video/BV1t4411e7LH/?spm_id_from=333.337.search-card.all.click 編譯原理 哈工大 陳鄞 b站課程地址:https://www.bilibili. ...


前言

本周繼續學習尼恩編著的《Netty、Redis、ZooKeeper高併發實戰》,一些資源也貼在這裡,自己以後想看還可以找到,這個是在博客園的一個入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
這周主要學習了Netty客戶端和服務端通信,書是由淺入深的在進行,從Socket NOI通信到 Reactor反應器模式,再到Netty框架,示例代碼都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,書結合源代碼,自己在動手試驗一下,感覺還是有些收穫。 今天的示例代碼就是實踐出一個客戶端和服務端傳遞protobuf的例子。

Netty 服務端

先來看一下服務端代碼,


@Slf4j
public class ProtoBufServer {

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public ProtoBufServer(int port) {
        this.serverPort = port;
    }

    public void runServer() {
        //創建reactor 線程組
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(bossLoopGroup, workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioServerSocketChannel.class);
            //3 設置監聽埠
            b.localAddress(serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配子通道流水線
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有連接到達時會創建一個channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水線添加3個handler處理器

                    // protobufDecoder僅僅負責編碼,並不支持讀半包,所以在之前,一定要有讀半包的處理器。
                    // 有三種方式可以選擇:
                    // 使用netty提供ProtobufVarint32FrameDecoder
                    // 繼承netty提供的通用半包處理器 LengthFieldBasedFrameDecoder
                    // 繼承ByteToMessageDecoder類,自己處理半包

                    // 半包的處理
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    // 需要解碼的目標類
                    ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
                    ch.pipeline().addLast(new ProtobufBussinessHandler());
                }
            });
            // 6 開始綁定server
            // 通過調用sync同步方法阻塞直到綁定成功
            ChannelFuture channelFuture = b.bind().sync();
            log.info(" 伺服器啟動成功,監聽埠: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    //伺服器端業務處理器
    static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
            //經過pipeline的各個decoder,到此Person類型已經可以斷定
            log.info("收到一個 MsgProtos.Msg 數據包 =》");
            log.info("protoMsg.getId():=" + protoMsg.getId());
            log.info("protoMsg.getClose():=" + protoMsg.getClose());
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        new ProtoBufServer(port).runServer();
    }
}

代碼中有註釋解釋,我在這裡加一下說明
代碼中有兩個EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用兩個是什麼原因呢? 一個是負責處理連接監聽事件, 一個負責處理數據IO事件和Handler業務處理,通俗點解釋就是一個負責接客,一個負責服務客戶。如果只有一個人就會忙不過來,讓後面的人等很久。
b.childHandler這個就是我們具體的如何處理接收到的消息,他們都繼承ChannelInboundHandlerAdapter,通過PipeLine把消息進行處理。我們從通道裡面拿到的都是位元組碼,那麼要轉成我們需要的Protobuf類,就需要用到這些處理類
前兩個處理類ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一個是為瞭解決半包問題,半包問題是因為在TCP傳輸的時候對數據包進行了拆包或者分包,收到的時候如果直接處理,就會有問題,需要我們在應用層進行二次封裝。 在這個示例中如果我們不使用ProtobufVarint32FrameDecoder,客戶端也不用,那麼就會出現有的可以解析出來,有的報錯的情況: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家來打比分,我們把一個家從一個地方搬到另外一個地方,還要求佈局一樣,汽車運輸的時候可能要分好幾次,那麼我們可以先記住位置,然後隨便裝車搬過去,過去後先暫存,再按記錄的位置進行還原,這樣來保證一模一樣。
ProtobufBussinessHandler 是我們自定義的Handler繼承了ChannelInboundHandlerAdapter,通過它我們拿到channel裡面的數據,轉換成我們的具體的Protobuf對象。這裡只是簡單的列印出來。 如果要把它繼續傳下去,需要調用 super.channelRead(ctx,msg)傳遞下去。

客戶端


@Slf4j
public class ProtoBufScanClient {
    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();

    public ProtoBufScanClient(String ip, int port) {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient() {
        //創建reactor 線程組
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioSocketChannel.class);
            //3 設置監聽埠
            b.remoteAddress(serverIp, serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配通道流水線
            b.handler(new ChannelInitializer<SocketChannel>() {
                //初始化客戶端channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 客戶端channel流水線添加2個handler處理器
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    ch.pipeline().addLast(new ProtobufEncoder());
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess()) {
                    log.info("EchoClient客戶端連接成功!");

                } else {
                    log.info("EchoClient客戶端連接失敗!");
                }

            });

            // 阻塞,直到連接完成
            f.sync();
            Channel channel = f.channel();


            Scanner scanner = new Scanner(System.in);
            log.info("請輸入發送內容:");


            GenericFutureListener sendCallBack = new GenericFutureListener() {

                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        log.info("發送成功!");

                    } else {
                        log.info("發送失敗!");
                    }
                }
            };


            while (scanner.hasNext()) {
                //獲取輸入的內容
                String next = scanner.next();
                String[] values = next.split(",");
                if(values.length != 5)
                {
                    log.info("格式不正確!");
                }
                else {
                    MarketPriceProto.MarketPrice msg = build(values);
                    ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
                    writeAndFlushFuture.addListener(sendCallBack);
                }

                log.info("請輸入發送內容:");

            }


            channel.flush();


            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
        }

    }

    //構建ProtoBuf對象
    public MarketPriceProto.MarketPrice build(String[] values) {

        MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();

        builder.setId(values[0]);


        builder.setOpen(Double.valueOf(values[1]));
        builder.setHigh(Double.valueOf(values[2]));
        builder.setLow(Double.valueOf(values[3]));
        builder.setClose(Double.valueOf(values[4]));

        return builder.build();
    }

    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        String ip = SOCKET_SERVER_IP;
        new ProtoBufScanClient(ip, port).runClient();
    }
}

Netty客戶端代碼和服務端很接近,這裡它只用了一個線程組,客戶端只有它一個使用,和服務端模式不一樣,一個就夠了。
客戶端讀取控制台輸入數據,然後構造成MarketPriceProto.MarketPrice,它的Proto定義如下

// [開始聲明]
syntax = "proto3";
//定義protobuf的包名稱空間
package com.ken.netty.protocol;
// [結束聲明]

// [開始 java 選項配置]
option java_package = "com.ken.netty.protocol";
option java_outer_classname = "MarketPriceProto";
// [結束 java 選項配置]

// [開始 消息定義]
message MarketPrice {
  string id = 1;
  double open = 2;
  double high = 3;
  double low = 4;
  double close = 5;
}

channel.writeAndFlush(msg), 這裡我們直接把Protobuf對象傳遞進channel, ProtobufEncoder會對它進行編碼。

總結

代碼比較簡單,這是在有書的幫助和源代碼幫助下實現的,不通過書這個也不是這麼容易理解的。 作者還實現了一個及時通信的例子https://gitee.com/crazymaker/SimpleCrayIM。 也需要花些時間來學習一下。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有多處記憶體泄漏時的輸出報告解析。 ...
  • 將用戶發來的指令以RESP協議的形式存儲在本地的AOF文件,重啟Redis後執行此文件恢複數據 https://github.com/csgopher/go-redis 本文涉及以下文件: redis.conf:配置文件 aof:實現aof redis.conf appendonly yes app ...
  • 原文鏈接: Go 語言 new 和 make 關鍵字的區別 本篇文章來介紹一道非常常見的面試題,到底有多常見呢?可能很多面試的開場白就是由此開始的。那就是 new 和 make 這兩個內置函數的區別。 其實這個問題本身並不複雜,簡單來說就是,new 只分配記憶體,而 make 只能用於 slice、m ...
  • 南昌航空大學-軟體學院-22206104-段清如-JAVA第一次Blog作業 前言: 這個學期才開始接觸java,到現在一個多月的時間,已經差不多可以寫出一些基本的簡單的程式了。對比上個學期學習的C語言,我認為java更加方便,方法更多,函數更多,但是時間效率上略遜一籌。在這一個月的java學習過程 ...
  • 在上一章中已經看到,odoo能夠為給定模型生成預設視圖。實際上,預設視圖對於業務應用程式來說是不可接受的。相反,我們至少應該以邏輯的方式組織各個欄位。 視圖是在帶有操作和菜單的XML文件中定義的。它們是ir.ui.view model的實例。 在我們的estate模塊中,我們需要以邏輯方式組織欄位: ...
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有一處記憶體泄漏時的輸出報告解析。 ...
  • 目錄 瞭解需求 方案 1:資料庫輪詢 方案 2:JDK 的延遲隊列 方案 3:時間輪演算法 方案 4:redis 緩存 方案 5:使用消息隊列 瞭解需求 在開發中,往往會遇到一些關於延時任務的需求。 例如 生成訂單 30 分鐘未支付,則自動取消 生成訂單 60 秒後,給用戶發簡訊 對上述的任務,我們給 ...
  • 操作系統 :Windows10_x64 、CentOS 7.6.1810_x64 wireshark版本:3.6.12 Python 版本 : 3.9.12 一、背景描述 工作中有時候會遇到需要從pcap抓包文件裡面提取音頻的情況,比如下麵這些場景: 從pcap文件裡面導出wav文件 從pcap文件 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...