使用Netty框架完成客戶端和服務端收發Protobuf消息

来源:https://www.cnblogs.com/dk168/archive/2023/03/26/17259124.html
-Advertisement-
Play Games

電腦組成原理 哈工大 劉巨集偉 b站課程地址:https://www.bilibili.com/video/BV1t4411e7LH/?spm_id_from=333.337.search-card.all.click 編譯原理 哈工大 陳鄞 b站課程地址:https://www.bilibili. ...


前言

本周繼續學習尼恩編著的《Netty、Redis、ZooKeeper高併發實戰》,一些資源也貼在這裡,自己以後想看還可以找到,這個是在博客園的一個入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
這周主要學習了Netty客戶端和服務端通信,書是由淺入深的在進行,從Socket NOI通信到 Reactor反應器模式,再到Netty框架,示例代碼都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,書結合源代碼,自己在動手試驗一下,感覺還是有些收穫。 今天的示例代碼就是實踐出一個客戶端和服務端傳遞protobuf的例子。

Netty 服務端

先來看一下服務端代碼,


@Slf4j
public class ProtoBufServer {

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public ProtoBufServer(int port) {
        this.serverPort = port;
    }

    public void runServer() {
        //創建reactor 線程組
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(bossLoopGroup, workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioServerSocketChannel.class);
            //3 設置監聽埠
            b.localAddress(serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配子通道流水線
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有連接到達時會創建一個channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水線添加3個handler處理器

                    // protobufDecoder僅僅負責編碼,並不支持讀半包,所以在之前,一定要有讀半包的處理器。
                    // 有三種方式可以選擇:
                    // 使用netty提供ProtobufVarint32FrameDecoder
                    // 繼承netty提供的通用半包處理器 LengthFieldBasedFrameDecoder
                    // 繼承ByteToMessageDecoder類,自己處理半包

                    // 半包的處理
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    // 需要解碼的目標類
                    ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
                    ch.pipeline().addLast(new ProtobufBussinessHandler());
                }
            });
            // 6 開始綁定server
            // 通過調用sync同步方法阻塞直到綁定成功
            ChannelFuture channelFuture = b.bind().sync();
            log.info(" 伺服器啟動成功,監聽埠: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    //伺服器端業務處理器
    static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
            //經過pipeline的各個decoder,到此Person類型已經可以斷定
            log.info("收到一個 MsgProtos.Msg 數據包 =》");
            log.info("protoMsg.getId():=" + protoMsg.getId());
            log.info("protoMsg.getClose():=" + protoMsg.getClose());
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        new ProtoBufServer(port).runServer();
    }
}

代碼中有註釋解釋,我在這裡加一下說明
代碼中有兩個EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用兩個是什麼原因呢? 一個是負責處理連接監聽事件, 一個負責處理數據IO事件和Handler業務處理,通俗點解釋就是一個負責接客,一個負責服務客戶。如果只有一個人就會忙不過來,讓後面的人等很久。
b.childHandler這個就是我們具體的如何處理接收到的消息,他們都繼承ChannelInboundHandlerAdapter,通過PipeLine把消息進行處理。我們從通道裡面拿到的都是位元組碼,那麼要轉成我們需要的Protobuf類,就需要用到這些處理類
前兩個處理類ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一個是為瞭解決半包問題,半包問題是因為在TCP傳輸的時候對數據包進行了拆包或者分包,收到的時候如果直接處理,就會有問題,需要我們在應用層進行二次封裝。 在這個示例中如果我們不使用ProtobufVarint32FrameDecoder,客戶端也不用,那麼就會出現有的可以解析出來,有的報錯的情況: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家來打比分,我們把一個家從一個地方搬到另外一個地方,還要求佈局一樣,汽車運輸的時候可能要分好幾次,那麼我們可以先記住位置,然後隨便裝車搬過去,過去後先暫存,再按記錄的位置進行還原,這樣來保證一模一樣。
ProtobufBussinessHandler 是我們自定義的Handler繼承了ChannelInboundHandlerAdapter,通過它我們拿到channel裡面的數據,轉換成我們的具體的Protobuf對象。這裡只是簡單的列印出來。 如果要把它繼續傳下去,需要調用 super.channelRead(ctx,msg)傳遞下去。

客戶端


@Slf4j
public class ProtoBufScanClient {
    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();

    public ProtoBufScanClient(String ip, int port) {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient() {
        //創建reactor 線程組
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioSocketChannel.class);
            //3 設置監聽埠
            b.remoteAddress(serverIp, serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配通道流水線
            b.handler(new ChannelInitializer<SocketChannel>() {
                //初始化客戶端channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 客戶端channel流水線添加2個handler處理器
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    ch.pipeline().addLast(new ProtobufEncoder());
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess()) {
                    log.info("EchoClient客戶端連接成功!");

                } else {
                    log.info("EchoClient客戶端連接失敗!");
                }

            });

            // 阻塞,直到連接完成
            f.sync();
            Channel channel = f.channel();


            Scanner scanner = new Scanner(System.in);
            log.info("請輸入發送內容:");


            GenericFutureListener sendCallBack = new GenericFutureListener() {

                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        log.info("發送成功!");

                    } else {
                        log.info("發送失敗!");
                    }
                }
            };


            while (scanner.hasNext()) {
                //獲取輸入的內容
                String next = scanner.next();
                String[] values = next.split(",");
                if(values.length != 5)
                {
                    log.info("格式不正確!");
                }
                else {
                    MarketPriceProto.MarketPrice msg = build(values);
                    ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
                    writeAndFlushFuture.addListener(sendCallBack);
                }

                log.info("請輸入發送內容:");

            }


            channel.flush();


            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
        }

    }

    //構建ProtoBuf對象
    public MarketPriceProto.MarketPrice build(String[] values) {

        MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();

        builder.setId(values[0]);


        builder.setOpen(Double.valueOf(values[1]));
        builder.setHigh(Double.valueOf(values[2]));
        builder.setLow(Double.valueOf(values[3]));
        builder.setClose(Double.valueOf(values[4]));

        return builder.build();
    }

    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        String ip = SOCKET_SERVER_IP;
        new ProtoBufScanClient(ip, port).runClient();
    }
}

Netty客戶端代碼和服務端很接近,這裡它只用了一個線程組,客戶端只有它一個使用,和服務端模式不一樣,一個就夠了。
客戶端讀取控制台輸入數據,然後構造成MarketPriceProto.MarketPrice,它的Proto定義如下

// [開始聲明]
syntax = "proto3";
//定義protobuf的包名稱空間
package com.ken.netty.protocol;
// [結束聲明]

// [開始 java 選項配置]
option java_package = "com.ken.netty.protocol";
option java_outer_classname = "MarketPriceProto";
// [結束 java 選項配置]

// [開始 消息定義]
message MarketPrice {
  string id = 1;
  double open = 2;
  double high = 3;
  double low = 4;
  double close = 5;
}

channel.writeAndFlush(msg), 這裡我們直接把Protobuf對象傳遞進channel, ProtobufEncoder會對它進行編碼。

總結

代碼比較簡單,這是在有書的幫助和源代碼幫助下實現的,不通過書這個也不是這麼容易理解的。 作者還實現了一個及時通信的例子https://gitee.com/crazymaker/SimpleCrayIM。 也需要花些時間來學習一下。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有多處記憶體泄漏時的輸出報告解析。 ...
  • 將用戶發來的指令以RESP協議的形式存儲在本地的AOF文件,重啟Redis後執行此文件恢複數據 https://github.com/csgopher/go-redis 本文涉及以下文件: redis.conf:配置文件 aof:實現aof redis.conf appendonly yes app ...
  • 原文鏈接: Go 語言 new 和 make 關鍵字的區別 本篇文章來介紹一道非常常見的面試題,到底有多常見呢?可能很多面試的開場白就是由此開始的。那就是 new 和 make 這兩個內置函數的區別。 其實這個問題本身並不複雜,簡單來說就是,new 只分配記憶體,而 make 只能用於 slice、m ...
  • 南昌航空大學-軟體學院-22206104-段清如-JAVA第一次Blog作業 前言: 這個學期才開始接觸java,到現在一個多月的時間,已經差不多可以寫出一些基本的簡單的程式了。對比上個學期學習的C語言,我認為java更加方便,方法更多,函數更多,但是時間效率上略遜一籌。在這一個月的java學習過程 ...
  • 在上一章中已經看到,odoo能夠為給定模型生成預設視圖。實際上,預設視圖對於業務應用程式來說是不可接受的。相反,我們至少應該以邏輯的方式組織各個欄位。 視圖是在帶有操作和菜單的XML文件中定義的。它們是ir.ui.view model的實例。 在我們的estate模塊中,我們需要以邏輯方式組織欄位: ...
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有一處記憶體泄漏時的輸出報告解析。 ...
  • 目錄 瞭解需求 方案 1:資料庫輪詢 方案 2:JDK 的延遲隊列 方案 3:時間輪演算法 方案 4:redis 緩存 方案 5:使用消息隊列 瞭解需求 在開發中,往往會遇到一些關於延時任務的需求。 例如 生成訂單 30 分鐘未支付,則自動取消 生成訂單 60 秒後,給用戶發簡訊 對上述的任務,我們給 ...
  • 操作系統 :Windows10_x64 、CentOS 7.6.1810_x64 wireshark版本:3.6.12 Python 版本 : 3.9.12 一、背景描述 工作中有時候會遇到需要從pcap抓包文件裡面提取音頻的情況,比如下麵這些場景: 從pcap文件裡面導出wav文件 從pcap文件 ...
一周排行
    -Advertisement-
    Play Games
  • 周末,寫點簡單的水一下。 新版本的vs創建項目的時候可以選擇自帶一個swagger。然而這隻是基本的swagger功能。 幾個介面無所謂啦,隨著介面越來越多,就這麼丟給你,一時間也會懵逼,所以這篇文章要做的有兩個功能。 給swagger文檔添加註釋 給swagger添加切換“版本”的功能(也可以理解 ...
  • 大家好,我是沙漠盡頭的狼。 本文首發於Dotnet9,介紹使用Lib.Harmony庫攔截第三方.NET庫方法,達到不修改其源碼並能實現修改方法邏輯、預期行為的效果,並且不限於只攔截public訪問修飾的類及方法,行文目錄: 什麼是方法攔截? 示常式序攔截 非public方法怎麼攔截? 總結 1. ...
  • 問題代碼: xmal:一個按鈕+一個顯示框 1 <Button Width="100" Height="50" Margin="10" Click="Button_Click">test</Button> 2 <TextBox x:Name="display" Width="300" Height= ...
  • 前置條件 ​ 阿裡雲伺服器一臺(可在購買伺服器時勾選安裝寶塔選項,免去後面的寶塔安裝) ​ 設置阿裡雲伺服器密碼並登陸伺服器 ​ 以下操作均在伺服器Linux中進行(使用遠程連接工具登錄) 寶塔登錄 登錄阿裡雲伺服器在Linux命令行中輸入bt,查看寶塔信息 ​ 根據寶塔信息提供的網站登陸寶塔服務( ...
  • GetTokenInformation 用於檢索進程或線程的令牌(Token)信息。Token是一個數據結構,其包含有關進程或線程的安全上下文,代表當前用戶或服務的安全標識符和許可權信息。GetTokenInformation函數也可以用來獲取這些安全信息,通常用於在運行時檢查某個進程或線程的許可權或安... ...
  • matplotlib 在1.0版本之前其實是不支持3D圖形繪製的。 後來的版本中,matplotlib加入了3D圖形的支持,不僅僅是為了使數據的展示更加生動和有趣。更重要的是,由於多了一個維度,擴展了其展示數據分佈和關係的能力,可以一次從三個維度來比較數據。 下麵介紹在matplotlib中繪製各類 ...
  • 編寫一個App就能編譯發佈到iOS、Android和Web等各大平臺的跨平臺技術,各大廠商一直都有研究和發佈對應技術產品,目前最熱門的莫過於Flutter框架了。而Dart作為其唯一的編程語言,今天我們開始來體驗一下…… ...
  • 實現基本的線程池 前提:我們要實現的線程池有如下功能: 基本的線程池模型 能提交和運行任務 能正常關閉線程池 線程的拒絕策略 線程池擴容 縮容線程池 代碼地址: 1、線程池的介紹? 線程池是什麼? 線程池是一種利用池化技術來管理線程的一種技術。 當沒有線程池的時候,我們如何創建線程? 繼承Threa ...
  • SDRAM基本信息 儲存能力計算 4X16X4=256(Mbit),註意不是MByte SDRAM控制 sdram包含兩個部分:sdram_ctrl、fifo_ctrl。 sdram_ctrl:其頂層為SDRAM的控制模塊內部實例化了5個模塊,有初始化、自刷新、寫和讀模塊,還有一個仲裁模塊對這四個不 ...
  • 歡迎訪問我的GitHub 這裡分類和彙總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos 本篇概覽 欣宸正在為接下新的Java雲原生實戰系列原創做準備,既然是實戰,少不了一套雲原生環境,以下內容是必不可少的: linux操作系統 kuberne ...