從Redis、HTTP協議,看Nett協議設計,我發現了個驚天大秘密

来源:https://www.cnblogs.com/jiagooushi/archive/2023/01/03/17022190.html
-Advertisement-
Play Games

1. 協議的作用 TCP/IP 中消息傳輸基於流的方式,沒有邊界 協議的目的就是劃定消息的邊界,制定通信雙方要共同遵守的通信規則 2. Redis 協議 如果我們要向 Redis 伺服器發送一條 set name Nyima 的指令,需要遵守如下協議 // 該指令一共有3部分,每條指令之後都要添加回 ...


1. 協議的作用

TCP/IP 中消息傳輸基於流的方式,沒有邊界

協議的目的就是劃定消息的邊界,制定通信雙方要共同遵守的通信規則

2. Redis 協議

如果我們要向 Redis 伺服器發送一條 set name Nyima 的指令,需要遵守如下協議

// 該指令一共有3部分,每條指令之後都要添加回車與換行符
*3\r\n
// 第一個指令的長度是3
$3\r\n
// 第一個指令是set指令
set\r\n
// 下麵的指令以此類推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n

客戶端代碼如下

public class RedisClient {
    static final Logger log = LoggerFactory.getLogger(StudyServer.class);
    public static void main(String[] args) {
        NioEventLoopGroup group =  new NioEventLoopGroup();
        try {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            // 列印日誌
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    // 回車與換行符
                                    final byte[] LINE = {'\r','\n'};
                                    // 獲得ByteBuf
                                    ByteBuf buffer = ctx.alloc().buffer();
                                    // 連接建立後,向Redis中發送一條指令,註意添加回車與換行
                                    // set name Nyima
                                    buffer.writeBytes("*3".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$3".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("set".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$4".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("name".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("$5".getBytes());
                                    buffer.writeBytes(LINE);
                                    buffer.writeBytes("Nyima".getBytes());
                                    buffer.writeBytes(LINE);
                                    ctx.writeAndFlush(buffer);
                                }

                            });
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 6379));
            channelFuture.sync();
            // 關閉channel
            channelFuture.channel().close().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 關閉group
            group.shutdownGracefully();
        }
    }
}

控制台列印結果

1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+

Redis 中查詢執行結果

file

3. HTTP 協議

HTTP 協議在請求行請求頭中都有很多的內容,自己實現較為困難,可以使用 HttpServerCodec 作為伺服器端的解碼器與編碼器,來處理 HTTP 請求

// HttpServerCodec 中既有請求的解碼器 HttpRequestDecoder 又有響應的編碼器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表該類既作為 編碼器 又作為 解碼器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
        implements HttpServerUpgradeHandler.SourceCodec

伺服器代碼

public class HttpServer {
    static final Logger log = LoggerFactory.getLogger(StudyServer.class);

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        new ServerBootstrap()
                .group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        // 作為伺服器,使用 HttpServerCodec 作為編碼器與解碼器
                        ch.pipeline().addLast(new HttpServerCodec());
                        // 伺服器只處理HTTPRequest
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
                                // 獲得請求uri
                                log.debug(msg.uri());

                                // 獲得完整響應,設置版本號與狀態碼
                                DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                                // 設置響應內容
                                byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
                                // 設置響應體長度,避免瀏覽器一直接收響應內容
                                response.headers().setInt(CONTENT_LENGTH, bytes.length);
                                // 設置響應體
                                response.content().writeBytes(bytes);

                                // 寫迴響應
                                ctx.writeAndFlush(response);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

伺服器負責處理請求並響應瀏覽器。所以只需要處理 HTTP 請求即可

// 伺服器只處理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()

獲得請求後,需要返迴響應給瀏覽器。需要創建響應對象 DefaultFullHttpResponse,設置 HTTP 版本號及狀態碼,為避免瀏覽器獲得響應後,因為獲得 CONTENT_LENGTH 而一直空轉,需要添加 CONTENT_LENGTH 欄位,表明響應體中數據的具體長度

// 獲得完整響應,設置版本號與狀態碼
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 設置響應內容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 設置響應體長度,避免瀏覽器一直接收響應內容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 設置響應體
response.content().writeBytes(bytes);

運行結果

瀏覽器

file
控制台

// 請求內容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....

// 響應內容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e          |, World!</h1>   |
+--------+-------------------------------------------------+----------------+

4. 自定義協議

組成要素

  • 魔數:用來在第一時間判定接收的數據是否為無效數據包

  • 版本號:可以支持協議的升級

  • 序列化演算法

    :消息正文到底採用哪種序列化反序列化方式

    • 如:json、protobuf、hessian、jdk
  • 指令類型:是登錄、註冊、單聊、群聊… 跟業務相關

  • 請求序號:為了雙工通信,提供非同步能力

  • 正文長度

  • 消息正文

編碼器與解碼器

public class MessageCodec extends ByteToMessageCodec<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 設置魔數 4個位元組
        out.writeBytes(new byte[]{'N','Y','I','M'});
        // 設置版本號 1個位元組
        out.writeByte(1);
        // 設置序列化方式 1個位元組
        out.writeByte(1);
        // 設置指令類型 1個位元組
        out.writeByte(msg.getMessageType());
        // 設置請求序號 4個位元組
        out.writeInt(msg.getSequenceId());
        // 為了補齊為16個位元組,填充1個位元組的數據
        out.writeByte(0xff);

        // 獲得序列化後的msg
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();

        // 獲得並設置正文長度 長度用4個位元組標識
        out.writeInt(bytes.length);
        // 設置消息正文
        out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 獲取魔數
        int magic = in.readInt();
        // 獲取版本號
        byte version = in.readByte();
        // 獲得序列化方式
        byte seqType = in.readByte();
        // 獲得指令類型
        byte messageType = in.readByte();
        // 獲得請求序號
        int sequenceId = in.readInt();
        // 移除補齊位元組
        in.readByte();
        // 獲得正文長度
        int length = in.readInt();
        // 獲得正文
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
		// 將信息放入List中,傳遞給下一個handler
        out.add(message);
        
        // 列印獲得的信息正文
        System.out.println("===========魔數===========");
        System.out.println(magic);
        System.out.println("===========版本號===========");
        System.out.println(version);
        System.out.println("===========序列化方法===========");
        System.out.println(seqType);
        System.out.println("===========指令類型===========");
        System.out.println(messageType);
        System.out.println("===========請求序號===========");
        System.out.println(sequenceId);
        System.out.println("===========正文長度===========");
        System.out.println(length);
        System.out.println("===========正文===========");
        System.out.println(message);
    }
}

  • 編碼器與解碼器方法源於父類 ByteToMessageCodec,通過該類可以自定義編碼器與解碼器, 泛型類型為被編碼與被解碼的類。此處使用了自定義類 Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>
  • 編碼器負責將附加信息與正文信息寫入到 ByteBuf 中,其中附加信息總位元組數最好為 2n,不足需要補齊。正文內容如果為對象,需要通過序列化將其放入到 ByteBuf 中

  • 解碼器負責將 ByteBuf 中的信息取出,並放入 List 中,該 List 用於將信息傳遞給下一個 handler

編寫測試類

public class TestCodec {
    static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
    public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel();
        // 添加解碼器,避免粘包半包問題
        channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
        channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        channel.pipeline().addLast(new MessageCodec());
        LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");

        // 測試編碼與解碼
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null, user, byteBuf);
        channel.writeInbound(byteBuf);
    }
}
  • 測試類中用到了 LengthFieldBasedFrameDecoder,避免粘包半包問題
  • 通過 MessageCodec 的 encode 方法將附加信息與正文寫入到 ByteBuf 中,通過 channel 執行入站操作。入站時會調用 decode 方法進行解碼

運行結果

file

file

@Sharable 註解

為了提高 handler 的復用率,可以將 handler 創建為 handler 對象,然後在不同的 channel 中使用該 handler 對象進行處理操作

LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一個handler對象,提高復用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);

但是並不是所有的 handler 都能通過這種方法來提高復用率的,例如 LengthFieldBasedFrameDecoder。如果多個 channel 中使用同一個 LengthFieldBasedFrameDecoder 對象,則可能發生如下問題

  • channel1 中收到了一個半包,LengthFieldBasedFrameDecoder 發現不是一條完整的數據,則沒有繼續向下傳播

  • 此時 channel2 中也收到了一個半包,因為兩個 channel 使用了同一個 LengthFieldBasedFrameDecoder,存入其中的數據剛好拼湊成了一個完整的數據包。LengthFieldBasedFrameDecoder 讓該數據包繼續向下傳播,最終引發錯誤

為了提高 handler 的復用率,同時又避免出現一些併發問題,Netty 中原生的 handler 中用 @Sharable 註解來標明,該 handler 能否在多個 channel 中共用。

只有帶有該註解,才能通過對象的方式被共用,否則無法被共用

自定義編解碼器能否使用 @Sharable 註解

這需要根據自定義的 handler 的處理邏輯進行分析

我們的 MessageCodec 本身接收的是 LengthFieldBasedFrameDecoder 處理之後的數據,那麼數據肯定是完整的,按分析來說是可以添加 @Sharable 註解的

但是實際情況我們並不能添加該註解,會拋出異常信息 ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared

  • 因為 MessageCodec 繼承自 ByteToMessageCodec,ByteToMessageCodec 類的註解如下

    file

這就意味著 ByteToMessageCodec 不能被多個 channel 所共用的

  • 原因:因為該類的目標是:將 ByteBuf 轉化為 Message,意味著傳進該 handler 的數據還未被處理過。所以傳過來的 ByteBuf 可能並不是完整的數據,如果共用則會出現問題

如果想要共用,需要怎麼辦呢?

繼承 MessageToMessageDecoder 即可。 該類的目標是:將已經被處理的完整數據再次被處理。傳過來的 Message 如果是被處理過的完整數據,那麼被共用也就不會出現問題了,也就可以使用 @Sharable 註解了。實現方式與 ByteToMessageCodec 類似

@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
        ...
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		...
    }
}

本文由傳智教育博學谷教研團隊發佈。

如果本文對您有幫助,歡迎關註點贊;如果您有任何建議也可留言評論私信,您的支持是我堅持創作的動力。

轉載請註明出處!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. 拷貝和拷貝構造函數 拷貝和複製是一個意思,對應的英文單詞都是copy。對於電腦來說,拷貝是指用一份原有的、已經存在的數據創建出一份新的數據,最終的結果是多了一份相同的數據。例如,將 Word 文檔拷貝到U盤去複印店列印,將 D 盤的圖片拷貝到桌面以方便瀏覽,將重要的文件上傳到百度網盤以防止丟 ...
  • 家居網購項目實現013 以下皆為部分代碼,詳見 https://github.com/liyuelian/furniture_mall.git 32.功能30-會員不能登錄後臺管理 32.1需求分析/圖解 管理員admin登錄後,可以訪問所有頁面 會員登錄後,不能訪問後臺管理相關頁面,其他頁面可以訪 ...
  • 大家好,我是車轍,我的掘金小冊《SkyWalking:應用監控和鏈路跟蹤》已經上線啦,這是我的第一本電子書,歡迎大家訂閱。 整整好是9月的最後一天下午,能按耐住衝動的是少之又少,至於原因嘛你懂的。趕高鐵的準備趕高鐵,沒趕高鐵的也假裝趕高鐵。特別是開發同學,腦門上就差貼張紙條:別打擾我。 現在離跑路時 ...
  • 大多數開發者可能都用過 Postman,根據其官網的介紹:Postman 是一個用於構建和使用 API 的 API 平臺,簡化了 API 生命周期的每個步驟,提供更便捷的團隊協作,因此可以更快地創建更好的 API。這裡的 API,除了我們常用的 HTTP API 之外,還包括 Websocket(B ...
  • 一、前言 redis在我們企業級開發中是很常見的,但是單個redis不能保證我們的穩定使用,所以我們要建立一個集群。 redis有兩種高可用的方案: High availability with Redis Sentinel(哨兵) Scaling with Redis Cluster(分片集群) ...
  • 有問必答 最近有好多讀者私信我,為什麼選擇GoFrame做電商項目的開發? 原因很簡單: 因為我司是用GoFrame做電商業務開發的,而且我司同事基本都是PHP轉Go的。GoFrame可以說是非常適合PHPer轉Gopher的開發框架。 在入職我司之前,我有使用Gin和go-micro框架,目前也正 ...
  • JavaSE:基礎語法 註釋 Java中的註釋有三種: 單行註釋:只能註釋當前行,以//開始,直到行結束 ​ //輸出HelloWorld! 多行註釋:註釋一段文字,以/ * 開始以 * / 結束! ​ /* 這是我們Java程式的主入口, main方法也是程式的主線程。 */ 文檔註釋:用於生產A ...
  • JZ78 把二叉樹列印成多行 題目 給定一個節點數為 n 二叉樹,要求從上到下按層列印二叉樹的 val 值,同一層結點從左至右輸出,每一層輸出一行, 將輸出的結果存放到一個二維數組中返回。 例如:給定的二叉樹是{1,2,3,#,#,4,5} [ [1], [2,3], [4,5] ] 方法 非遞歸層 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...