使用Netty框架完成客戶端和服務端收發Protobuf消息

来源:https://www.cnblogs.com/dk168/archive/2023/03/26/17259124.html
-Advertisement-
Play Games

電腦組成原理 哈工大 劉巨集偉 b站課程地址:https://www.bilibili.com/video/BV1t4411e7LH/?spm_id_from=333.337.search-card.all.click 編譯原理 哈工大 陳鄞 b站課程地址:https://www.bilibili. ...


前言

本周繼續學習尼恩編著的《Netty、Redis、ZooKeeper高併發實戰》,一些資源也貼在這裡,自己以後想看還可以找到,這個是在博客園的一個入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
這周主要學習了Netty客戶端和服務端通信,書是由淺入深的在進行,從Socket NOI通信到 Reactor反應器模式,再到Netty框架,示例代碼都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,書結合源代碼,自己在動手試驗一下,感覺還是有些收穫。 今天的示例代碼就是實踐出一個客戶端和服務端傳遞protobuf的例子。

Netty 服務端

先來看一下服務端代碼,


@Slf4j
public class ProtoBufServer {

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public ProtoBufServer(int port) {
        this.serverPort = port;
    }

    public void runServer() {
        //創建reactor 線程組
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(bossLoopGroup, workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioServerSocketChannel.class);
            //3 設置監聽埠
            b.localAddress(serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配子通道流水線
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有連接到達時會創建一個channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水線添加3個handler處理器

                    // protobufDecoder僅僅負責編碼,並不支持讀半包,所以在之前,一定要有讀半包的處理器。
                    // 有三種方式可以選擇:
                    // 使用netty提供ProtobufVarint32FrameDecoder
                    // 繼承netty提供的通用半包處理器 LengthFieldBasedFrameDecoder
                    // 繼承ByteToMessageDecoder類,自己處理半包

                    // 半包的處理
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    // 需要解碼的目標類
                    ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
                    ch.pipeline().addLast(new ProtobufBussinessHandler());
                }
            });
            // 6 開始綁定server
            // 通過調用sync同步方法阻塞直到綁定成功
            ChannelFuture channelFuture = b.bind().sync();
            log.info(" 伺服器啟動成功,監聽埠: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    //伺服器端業務處理器
    static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
            //經過pipeline的各個decoder,到此Person類型已經可以斷定
            log.info("收到一個 MsgProtos.Msg 數據包 =》");
            log.info("protoMsg.getId():=" + protoMsg.getId());
            log.info("protoMsg.getClose():=" + protoMsg.getClose());
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        new ProtoBufServer(port).runServer();
    }
}

代碼中有註釋解釋,我在這裡加一下說明
代碼中有兩個EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用兩個是什麼原因呢? 一個是負責處理連接監聽事件, 一個負責處理數據IO事件和Handler業務處理,通俗點解釋就是一個負責接客,一個負責服務客戶。如果只有一個人就會忙不過來,讓後面的人等很久。
b.childHandler這個就是我們具體的如何處理接收到的消息,他們都繼承ChannelInboundHandlerAdapter,通過PipeLine把消息進行處理。我們從通道裡面拿到的都是位元組碼,那麼要轉成我們需要的Protobuf類,就需要用到這些處理類
前兩個處理類ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一個是為瞭解決半包問題,半包問題是因為在TCP傳輸的時候對數據包進行了拆包或者分包,收到的時候如果直接處理,就會有問題,需要我們在應用層進行二次封裝。 在這個示例中如果我們不使用ProtobufVarint32FrameDecoder,客戶端也不用,那麼就會出現有的可以解析出來,有的報錯的情況: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家來打比分,我們把一個家從一個地方搬到另外一個地方,還要求佈局一樣,汽車運輸的時候可能要分好幾次,那麼我們可以先記住位置,然後隨便裝車搬過去,過去後先暫存,再按記錄的位置進行還原,這樣來保證一模一樣。
ProtobufBussinessHandler 是我們自定義的Handler繼承了ChannelInboundHandlerAdapter,通過它我們拿到channel裡面的數據,轉換成我們的具體的Protobuf對象。這裡只是簡單的列印出來。 如果要把它繼續傳下去,需要調用 super.channelRead(ctx,msg)傳遞下去。

客戶端


@Slf4j
public class ProtoBufScanClient {
    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();

    public ProtoBufScanClient(String ip, int port) {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient() {
        //創建reactor 線程組
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioSocketChannel.class);
            //3 設置監聽埠
            b.remoteAddress(serverIp, serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配通道流水線
            b.handler(new ChannelInitializer<SocketChannel>() {
                //初始化客戶端channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 客戶端channel流水線添加2個handler處理器
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    ch.pipeline().addLast(new ProtobufEncoder());
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess()) {
                    log.info("EchoClient客戶端連接成功!");

                } else {
                    log.info("EchoClient客戶端連接失敗!");
                }

            });

            // 阻塞,直到連接完成
            f.sync();
            Channel channel = f.channel();


            Scanner scanner = new Scanner(System.in);
            log.info("請輸入發送內容:");


            GenericFutureListener sendCallBack = new GenericFutureListener() {

                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        log.info("發送成功!");

                    } else {
                        log.info("發送失敗!");
                    }
                }
            };


            while (scanner.hasNext()) {
                //獲取輸入的內容
                String next = scanner.next();
                String[] values = next.split(",");
                if(values.length != 5)
                {
                    log.info("格式不正確!");
                }
                else {
                    MarketPriceProto.MarketPrice msg = build(values);
                    ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
                    writeAndFlushFuture.addListener(sendCallBack);
                }

                log.info("請輸入發送內容:");

            }


            channel.flush();


            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
        }

    }

    //構建ProtoBuf對象
    public MarketPriceProto.MarketPrice build(String[] values) {

        MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();

        builder.setId(values[0]);


        builder.setOpen(Double.valueOf(values[1]));
        builder.setHigh(Double.valueOf(values[2]));
        builder.setLow(Double.valueOf(values[3]));
        builder.setClose(Double.valueOf(values[4]));

        return builder.build();
    }

    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        String ip = SOCKET_SERVER_IP;
        new ProtoBufScanClient(ip, port).runClient();
    }
}

Netty客戶端代碼和服務端很接近,這裡它只用了一個線程組,客戶端只有它一個使用,和服務端模式不一樣,一個就夠了。
客戶端讀取控制台輸入數據,然後構造成MarketPriceProto.MarketPrice,它的Proto定義如下

// [開始聲明]
syntax = "proto3";
//定義protobuf的包名稱空間
package com.ken.netty.protocol;
// [結束聲明]

// [開始 java 選項配置]
option java_package = "com.ken.netty.protocol";
option java_outer_classname = "MarketPriceProto";
// [結束 java 選項配置]

// [開始 消息定義]
message MarketPrice {
  string id = 1;
  double open = 2;
  double high = 3;
  double low = 4;
  double close = 5;
}

channel.writeAndFlush(msg), 這裡我們直接把Protobuf對象傳遞進channel, ProtobufEncoder會對它進行編碼。

總結

代碼比較簡單,這是在有書的幫助和源代碼幫助下實現的,不通過書這個也不是這麼容易理解的。 作者還實現了一個及時通信的例子https://gitee.com/crazymaker/SimpleCrayIM。 也需要花些時間來學習一下。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有多處記憶體泄漏時的輸出報告解析。 ...
  • 將用戶發來的指令以RESP協議的形式存儲在本地的AOF文件,重啟Redis後執行此文件恢複數據 https://github.com/csgopher/go-redis 本文涉及以下文件: redis.conf:配置文件 aof:實現aof redis.conf appendonly yes app ...
  • 原文鏈接: Go 語言 new 和 make 關鍵字的區別 本篇文章來介紹一道非常常見的面試題,到底有多常見呢?可能很多面試的開場白就是由此開始的。那就是 new 和 make 這兩個內置函數的區別。 其實這個問題本身並不複雜,簡單來說就是,new 只分配記憶體,而 make 只能用於 slice、m ...
  • 南昌航空大學-軟體學院-22206104-段清如-JAVA第一次Blog作業 前言: 這個學期才開始接觸java,到現在一個多月的時間,已經差不多可以寫出一些基本的簡單的程式了。對比上個學期學習的C語言,我認為java更加方便,方法更多,函數更多,但是時間效率上略遜一籌。在這一個月的java學習過程 ...
  • 在上一章中已經看到,odoo能夠為給定模型生成預設視圖。實際上,預設視圖對於業務應用程式來說是不可接受的。相反,我們至少應該以邏輯的方式組織各個欄位。 視圖是在帶有操作和菜單的XML文件中定義的。它們是ir.ui.view model的實例。 在我們的estate模塊中,我們需要以邏輯方式組織欄位: ...
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有一處記憶體泄漏時的輸出報告解析。 ...
  • 目錄 瞭解需求 方案 1:資料庫輪詢 方案 2:JDK 的延遲隊列 方案 3:時間輪演算法 方案 4:redis 緩存 方案 5:使用消息隊列 瞭解需求 在開發中,往往會遇到一些關於延時任務的需求。 例如 生成訂單 30 分鐘未支付,則自動取消 生成訂單 60 秒後,給用戶發簡訊 對上述的任務,我們給 ...
  • 操作系統 :Windows10_x64 、CentOS 7.6.1810_x64 wireshark版本:3.6.12 Python 版本 : 3.9.12 一、背景描述 工作中有時候會遇到需要從pcap抓包文件裡面提取音頻的情況,比如下麵這些場景: 從pcap文件裡面導出wav文件 從pcap文件 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...