使用Netty框架完成客戶端和服務端收發Protobuf消息

来源:https://www.cnblogs.com/dk168/archive/2023/03/26/17259124.html
-Advertisement-
Play Games

電腦組成原理 哈工大 劉巨集偉 b站課程地址:https://www.bilibili.com/video/BV1t4411e7LH/?spm_id_from=333.337.search-card.all.click 編譯原理 哈工大 陳鄞 b站課程地址:https://www.bilibili. ...


前言

本周繼續學習尼恩編著的《Netty、Redis、ZooKeeper高併發實戰》,一些資源也貼在這裡,自己以後想看還可以找到,這個是在博客園的一個入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
這周主要學習了Netty客戶端和服務端通信,書是由淺入深的在進行,從Socket NOI通信到 Reactor反應器模式,再到Netty框架,示例代碼都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,書結合源代碼,自己在動手試驗一下,感覺還是有些收穫。 今天的示例代碼就是實踐出一個客戶端和服務端傳遞protobuf的例子。

Netty 服務端

先來看一下服務端代碼,


@Slf4j
public class ProtoBufServer {

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public ProtoBufServer(int port) {
        this.serverPort = port;
    }

    public void runServer() {
        //創建reactor 線程組
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(bossLoopGroup, workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioServerSocketChannel.class);
            //3 設置監聽埠
            b.localAddress(serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配子通道流水線
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有連接到達時會創建一個channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水線添加3個handler處理器

                    // protobufDecoder僅僅負責編碼,並不支持讀半包,所以在之前,一定要有讀半包的處理器。
                    // 有三種方式可以選擇:
                    // 使用netty提供ProtobufVarint32FrameDecoder
                    // 繼承netty提供的通用半包處理器 LengthFieldBasedFrameDecoder
                    // 繼承ByteToMessageDecoder類,自己處理半包

                    // 半包的處理
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    // 需要解碼的目標類
                    ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
                    ch.pipeline().addLast(new ProtobufBussinessHandler());
                }
            });
            // 6 開始綁定server
            // 通過調用sync同步方法阻塞直到綁定成功
            ChannelFuture channelFuture = b.bind().sync();
            log.info(" 伺服器啟動成功,監聽埠: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    //伺服器端業務處理器
    static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
            //經過pipeline的各個decoder,到此Person類型已經可以斷定
            log.info("收到一個 MsgProtos.Msg 數據包 =》");
            log.info("protoMsg.getId():=" + protoMsg.getId());
            log.info("protoMsg.getClose():=" + protoMsg.getClose());
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        new ProtoBufServer(port).runServer();
    }
}

代碼中有註釋解釋,我在這裡加一下說明
代碼中有兩個EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用兩個是什麼原因呢? 一個是負責處理連接監聽事件, 一個負責處理數據IO事件和Handler業務處理,通俗點解釋就是一個負責接客,一個負責服務客戶。如果只有一個人就會忙不過來,讓後面的人等很久。
b.childHandler這個就是我們具體的如何處理接收到的消息,他們都繼承ChannelInboundHandlerAdapter,通過PipeLine把消息進行處理。我們從通道裡面拿到的都是位元組碼,那麼要轉成我們需要的Protobuf類,就需要用到這些處理類
前兩個處理類ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一個是為瞭解決半包問題,半包問題是因為在TCP傳輸的時候對數據包進行了拆包或者分包,收到的時候如果直接處理,就會有問題,需要我們在應用層進行二次封裝。 在這個示例中如果我們不使用ProtobufVarint32FrameDecoder,客戶端也不用,那麼就會出現有的可以解析出來,有的報錯的情況: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家來打比分,我們把一個家從一個地方搬到另外一個地方,還要求佈局一樣,汽車運輸的時候可能要分好幾次,那麼我們可以先記住位置,然後隨便裝車搬過去,過去後先暫存,再按記錄的位置進行還原,這樣來保證一模一樣。
ProtobufBussinessHandler 是我們自定義的Handler繼承了ChannelInboundHandlerAdapter,通過它我們拿到channel裡面的數據,轉換成我們的具體的Protobuf對象。這裡只是簡單的列印出來。 如果要把它繼續傳下去,需要調用 super.channelRead(ctx,msg)傳遞下去。

客戶端


@Slf4j
public class ProtoBufScanClient {
    private int serverPort;
    private String serverIp;
    Bootstrap b = new Bootstrap();

    public ProtoBufScanClient(String ip, int port) {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient() {
        //創建reactor 線程組
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設置reactor 線程組
            b.group(workerLoopGroup);
            //2 設置nio類型的channel
            b.channel(NioSocketChannel.class);
            //3 設置監聽埠
            b.remoteAddress(serverIp, serverPort);
            //4 設置通道的參數
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 裝配通道流水線
            b.handler(new ChannelInitializer<SocketChannel>() {
                //初始化客戶端channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 客戶端channel流水線添加2個handler處理器
                    ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    ch.pipeline().addLast(new ProtobufEncoder());
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess()) {
                    log.info("EchoClient客戶端連接成功!");

                } else {
                    log.info("EchoClient客戶端連接失敗!");
                }

            });

            // 阻塞,直到連接完成
            f.sync();
            Channel channel = f.channel();


            Scanner scanner = new Scanner(System.in);
            log.info("請輸入發送內容:");


            GenericFutureListener sendCallBack = new GenericFutureListener() {

                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        log.info("發送成功!");

                    } else {
                        log.info("發送失敗!");
                    }
                }
            };


            while (scanner.hasNext()) {
                //獲取輸入的內容
                String next = scanner.next();
                String[] values = next.split(",");
                if(values.length != 5)
                {
                    log.info("格式不正確!");
                }
                else {
                    MarketPriceProto.MarketPrice msg = build(values);
                    ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
                    writeAndFlushFuture.addListener(sendCallBack);
                }

                log.info("請輸入發送內容:");

            }


            channel.flush();


            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括創建的線程
            workerLoopGroup.shutdownGracefully();
        }

    }

    //構建ProtoBuf對象
    public MarketPriceProto.MarketPrice build(String[] values) {

        MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();

        builder.setId(values[0]);


        builder.setOpen(Double.valueOf(values[1]));
        builder.setHigh(Double.valueOf(values[2]));
        builder.setLow(Double.valueOf(values[3]));
        builder.setClose(Double.valueOf(values[4]));

        return builder.build();
    }

    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        String ip = SOCKET_SERVER_IP;
        new ProtoBufScanClient(ip, port).runClient();
    }
}

Netty客戶端代碼和服務端很接近,這裡它只用了一個線程組,客戶端只有它一個使用,和服務端模式不一樣,一個就夠了。
客戶端讀取控制台輸入數據,然後構造成MarketPriceProto.MarketPrice,它的Proto定義如下

// [開始聲明]
syntax = "proto3";
//定義protobuf的包名稱空間
package com.ken.netty.protocol;
// [結束聲明]

// [開始 java 選項配置]
option java_package = "com.ken.netty.protocol";
option java_outer_classname = "MarketPriceProto";
// [結束 java 選項配置]

// [開始 消息定義]
message MarketPrice {
  string id = 1;
  double open = 2;
  double high = 3;
  double low = 4;
  double close = 5;
}

channel.writeAndFlush(msg), 這裡我們直接把Protobuf對象傳遞進channel, ProtobufEncoder會對它進行編碼。

總結

代碼比較簡單,這是在有書的幫助和源代碼幫助下實現的,不通過書這個也不是這麼容易理解的。 作者還實現了一個及時通信的例子https://gitee.com/crazymaker/SimpleCrayIM。 也需要花些時間來學習一下。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有多處記憶體泄漏時的輸出報告解析。 ...
  • 將用戶發來的指令以RESP協議的形式存儲在本地的AOF文件,重啟Redis後執行此文件恢複數據 https://github.com/csgopher/go-redis 本文涉及以下文件: redis.conf:配置文件 aof:實現aof redis.conf appendonly yes app ...
  • 原文鏈接: Go 語言 new 和 make 關鍵字的區別 本篇文章來介紹一道非常常見的面試題,到底有多常見呢?可能很多面試的開場白就是由此開始的。那就是 new 和 make 這兩個內置函數的區別。 其實這個問題本身並不複雜,簡單來說就是,new 只分配記憶體,而 make 只能用於 slice、m ...
  • 南昌航空大學-軟體學院-22206104-段清如-JAVA第一次Blog作業 前言: 這個學期才開始接觸java,到現在一個多月的時間,已經差不多可以寫出一些基本的簡單的程式了。對比上個學期學習的C語言,我認為java更加方便,方法更多,函數更多,但是時間效率上略遜一籌。在這一個月的java學習過程 ...
  • 在上一章中已經看到,odoo能夠為給定模型生成預設視圖。實際上,預設視圖對於業務應用程式來說是不可接受的。相反,我們至少應該以邏輯的方式組織各個欄位。 視圖是在帶有操作和菜單的XML文件中定義的。它們是ir.ui.view model的實例。 在我們的estate模塊中,我們需要以邏輯方式組織欄位: ...
  • 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。本篇介紹在 QT 中使用 VLD 時,有一處記憶體泄漏時的輸出報告解析。 ...
  • 目錄 瞭解需求 方案 1:資料庫輪詢 方案 2:JDK 的延遲隊列 方案 3:時間輪演算法 方案 4:redis 緩存 方案 5:使用消息隊列 瞭解需求 在開發中,往往會遇到一些關於延時任務的需求。 例如 生成訂單 30 分鐘未支付,則自動取消 生成訂單 60 秒後,給用戶發簡訊 對上述的任務,我們給 ...
  • 操作系統 :Windows10_x64 、CentOS 7.6.1810_x64 wireshark版本:3.6.12 Python 版本 : 3.9.12 一、背景描述 工作中有時候會遇到需要從pcap抓包文件裡面提取音頻的情況,比如下麵這些場景: 從pcap文件裡面導出wav文件 從pcap文件 ...
一周排行
    -Advertisement-
    Play Games
  • 最近做項目過程中,使用到了海康相機,官方只提供了C/C++的SDK,沒有搜尋到一個合適的封裝了的C#庫,故自己動手,簡單的封裝了一下,方便大家也方便自己使用和二次開發 ...
  • 前言 MediatR 是 .NET 下的一個實現消息傳遞的庫,輕量級、簡潔高效,用於實現進程內的消息傳遞機制。它基於中介者設計模式,支持請求/響應、命令、查詢、通知和事件等多種消息傳遞模式。通過泛型支持,MediatR 可以智能地調度不同類型的消息,非常適合用於領域事件處理。 在本文中,將通過一個簡 ...
  • 前言 今天給大家推薦一個超實用的開源項目《.NET 7 + Vue 許可權管理系統 小白快速上手》,DncZeus的願景就是做一個.NET 領域小白也能上手的簡易、通用的後臺許可權管理模板系統基礎框架。 不管你是技術小白還是技術大佬或者是不懂前端Vue 的新手,這個項目可以快速上手讓我們從0到1,搭建自 ...
  • 第1章:WPF概述 本章目標 瞭解Windows圖形演化 瞭解WPF高級API 瞭解解析度無關性概念 瞭解WPF體繫結構 瞭解WPF 4.5 WPF概述 ​ 歡迎使用 Windows Presentation Foundation (WPF) 桌面指南,這是一個與解析度無關的 UI 框架,使用基於矢 ...
  • 在日常開發中,並不是所有的功能都是用戶可見的,還在一些背後默默支持的程式,這些程式通常以服務的形式出現,統稱為輔助角色服務。今天以一個簡單的小例子,簡述基於.NET開發輔助角色服務的相關內容,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 第3章:佈局 本章目標 理解佈局的原則 理解佈局的過程 理解佈局的容器 掌握各類佈局容器的運用 理解 WPF 中的佈局 WPF 佈局原則 ​ WPF 視窗只能包含單個元素。為在WPF 視窗中放置多個元素並創建更貼近實用的用戶男面,需要在視窗上放置一個容器,然後在這個容器中添加其他元素。造成這一限制的 ...
  • 前言 在平時項目開發中,定時任務調度是一項重要的功能,廣泛應用於後臺作業、計劃任務和自動化腳本等模塊。 FreeScheduler 是一款輕量級且功能強大的定時任務調度庫,它支持臨時的延時任務和重覆迴圈任務(可持久化),能夠按秒、每天/每周/每月固定時間或自定義間隔執行(CRON 表達式)。 此外 ...
  • 目錄Blazor 組件基礎路由導航參數組件參數路由參數生命周期事件狀態更改組件事件 Blazor 組件 基礎 新建一個項目命名為 MyComponents ,項目模板的交互類型選 Auto ,其它保持預設選項: 客戶端組件 (Auto/WebAssembly): 最終解決方案裡面會有兩個項目:伺服器 ...
  • 先看一下效果吧: isChecked = false 的時候的效果 isChecked = true 的時候的效果 然後我們來實現一下這個效果吧 第一步:創建一個空的wpf項目; 第二步:在項目裡面添加一個checkbox <Grid> <CheckBox HorizontalAlignment=" ...
  • 在編寫上位機軟體時,需要經常處理命令拼接與其他設備進行通信,通常對不同的命令封裝成不同的方法,擴展稍許麻煩。 本次擬以特性方式實現,以兼顧維護性與擴展性。 思想: 一種命令對應一個類,其類中的各個屬性對應各個命令段,通過特性的方式,實現其在這包數據命令中的位置、大端或小端及其轉換為對應的目標類型; ...