編程思想:如何設計一個好的通信網路協議

来源:https://www.cnblogs.com/OceanEyes/archive/2020/03/30/protocol_design.html
-Advertisement-
Play Games

當網路中兩個進程需要通信時,我們往往會使用 來實現。 都不陌生。當三次握手成功後,客戶端與服務端就能通信,並且,彼此之間通信的數據包格式都是二進位,由 協議負責傳輸。 當客戶端和服務端取得了二進位數據包後,我們往往需要『萃取』出想要的數據,這樣才能更好的執行業務邏輯。所以,我們需要定義好數據結構來描 ...


當網路中兩個進程需要通信時,我們往往會使用 Socket 來實現。Socket 都不陌生。當三次握手成功後,客戶端與服務端就能通信,並且,彼此之間通信的數據包格式都是二進位,由 TCP/IP 協議負責傳輸。

當客戶端和服務端取得了二進位數據包後,我們往往需要『萃取』出想要的數據,這樣才能更好的執行業務邏輯。所以,我們需要定義好數據結構來描述這些二進位數據的格式,這就是通信網路協議。簡單講,就是需要約定好二進位數據包中每一段位元組的含義,比如從第 n 位元組開始的 m 長度是核心數據,有了這樣的約定後,我們就能解碼出想要的數據,執行業務邏輯,這樣我們就能暢通無阻的通信了。

網路協議的設計

概要劃分

一個最基本的網路協議必須包含

  • 數據的長度
  • 數據

瞭解 TCP 協議的同學一定聽說過粘包、拆包 這兩個術語。因為TCP協議是數據流協議,它的底層根據二進位緩衝區的實際情況進行包的劃分。所以,不可避免的會出現粘包,拆包 現象 。為瞭解決它們,我們的網路協議往往會使用一個 4 位元組的 int 類型來表示數據的大小。比如,Netty 就為我們提供了 LengthFieldBasedFrameDecoder 解碼器,它可以有效的使用自定義長度幀來解決上述問題。

同時一個好的網路協議,還會將動作和業務數據分離。試想一下, HTTP 協議的分為請求頭,請求體——

  • 請求頭:定義了介面地址、Http MethodHTTP 版本
  • 請求體:定義了需要傳遞的數據

這就是一種分離關註點的思想。所以自定義的網路協議也可以包含:

  • 動作指令:比如定義 code 來分門別類的代表不同的業務邏輯
  • 序列化演算法:描述了 JAVA 對象和二進位之間轉換的形式,提供多種序列化/反序列化方式。比如 jsonprotobuf 等等,甚至是自定義演算法。比如:rocketmq 等等。

同時,協議的開頭可以定義一個約定的魔數。這個固定值(4位元組),一般用來判斷當前的數據包是否合法。比如,當我們使用 telnet 發送錯誤的數據包時,很顯然,它不合法,會導致解碼失敗。所以,為了減輕伺服器的壓力,我們可以取出數據包的前4個位元組與固定的魔數對比,如果是非法的格式,直接關閉連接,不繼續解碼。

網路協議結構如下所示

+--------------+-----------+------------+-----------+----------+
| 魔數(4)       | code(1)   |序列化演算法(1) |數據長度(4) |數據(n)   |
+--------------+-----------+------------+-----------+----------+ 

RocketMQ 通信網路協議的實現

RocketMQ 網路協議

這一小節,我們從RocketMQ 中,分析優秀通信網路協議的實現。RocketMQ 項目中,客戶端和服務端的通信是基於 Netty 之上構建的。同時,為了更加有效的通信,往往需要對發送的消息自定義網路協議。

RocketMQ 的網路協議,從數據分類的角度上看,可分為兩大類

  • 消息頭數據(Header Data)
  • 消息體數據(Body Data)

從左到右

  • 第一段:4 個位元組整數,等於2、3、4 長度總和

  • 第二段:4 個位元組整數,等於3 的長度。特別的 byte[0] 代表序列化演算法,byte[1~3]才是真正的長度

  • 第三段:代表消息頭數據,結構如下

{
    "code":0,
    "language":"JAVA",
    "version":0,
    "opaque":0,
    "flag":1,
    "remark":"hello, I am respponse /127.0.0.1:27603",
    "extFields":{
        "count":"0",
        "messageTitle":"HelloMessageTitle"
    }
}
  • 第四段:代表消息體數據

RocketMQ 消息頭協議詳細如下:

Header 欄位名 類型 Request Response
code 整數 請求操作代碼,請求接收方根據不同的代碼做不同的操作 應答結果代碼,0表示成功,非0表示各種錯誤代碼
language 字元串 請求發起方實現語言,預設JAVA 應答接收方實現語言
version 整數 請求發起方程式版本 應答接收方程式版本
opaque 整數 請求發起方在同一連接上不同的請求標識代碼,多線程連接復用使用 應答方不做修改,直接返回
flag 整數 通信層的標誌位 通信層的標誌位
remark 字元串 傳輸自定義文本信息 錯誤詳細描述信息
extFields HashMap<String,String> 請求自定義欄位 應答自定義欄位

編碼過程

RocketMQ 的通信模塊是基於 Netty的。通過定義 NettyEncoder 來實現對每一個 Channel的 出棧數據進行編碼,如下所示:

@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
           ...
        }
    }
}

其中,核心的編碼過程位於 RemotingCommand 對象中,encodeHeader 階段,需要統計出消息總長度,即:

  • 定義消息頭長度,一個整數表示:占4個位元組

  • 定義消息頭數據,並計算其長度

  • 定義消息體數據,並計算其長度

  • 額外再加 4是因為需要加入消息總長度,一個整數表示:占4個位元組

public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> 消息頭長度,一個整數表示:占4個位元組
    int length = 4;

    // 2> 消息頭數據
    byte[] headerData;
    headerData = this.headerEncode();
    // 再加消息頭數據長度
    length += headerData.length;

    // 3> 再加消息體數據長度
    length += bodyLength;
    // 4> 額外加 4是因為需要加入消息總長度,一個整數表示:占4個位元組
    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // 5> 將消息總長度加入 ByteBuffer
    result.putInt(length);

    // 6> 將消息的頭長度加入 ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // 7> 將消息頭數據加入 ByteBuffer
    result.put(headerData);

    result.flip();

    return result;
}

其中,encode 階段會將 CommandCustomHeader 數據轉換 HashMap<String,String>,方便序列化

public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap<String, String>();
        }

        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

特別的,消息頭序列化支持兩種演算法:

  • JSON
  • RocketMQ
private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}

這兒需要值得註意的是,encode階段將當前 RPC 類型和 headerData長度編碼到一個 byte[4] 數組中,byte[0] 位序列化類型。

public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

其中,通過與運算 & 0xFF 取低八位數據。

所以, 最終 length 長度等於序列化類型 + header length + header data + body data 的位元組的長度。

解碼過程

RocketMQ 解碼通過NettyDecoder來實現,它繼承自 LengthFieldBasedFrameDecoder,其中調用了父類LengthFieldBasedFrameDecoder的構造函數

super(FRAME_MAX_LENGTH, 0, 4, 0, 4);

這些參數設置4個位元組代表 length總長度,同時解碼時跳過最開始的4個位元組:

frame = (ByteBuf) super.decode(ctx, in);

所以,得到的 frame= 序列化類型 + header length + header data + body data 。解碼如下所示:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    //總長度
    int length = byteBuffer.limit();
    //原始的 header length,4位
    int oriHeaderLen = byteBuffer.getInt();
    //真正的 header data 長度。忽略 byte[0]的 serializeType
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
        case JSON:
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);
            return resultJson;
        case ROCKETMQ:
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);
            return resultRMQ;
        default:
            break;
    }

    return null;
}

其中,getProtocolType,右移 24位,拿到 serializeType

public static SerializeType getProtocolType(int source) {
    return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}

getHeaderLength 拿到 0-24 位代表的 headerData length:

public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
}

小結

對於諸多中間件而言,底層的網路通信模塊往往會使用 NettyNetty 提供了諸多的編解碼器,可以快速方便的上手。本文從如何設計一個網路協議入手,最終切入到 RocketMQ 底層網路協議的實現。可以看到,它並不複雜。仔細研讀幾遍變能理解其奧義。具體參考類NettyEncoderNettyDecoderRemotingCommand


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • number類型函數 random() 0-1 random(10) 0-10之間的整數 random(100) 0-100之間的整數 編譯後 list數組相關函數 編譯後 @debug 實時返回列印結果,併在控制台輸出 如現在演示的函數操作,沒有實際的css意義,可以直接在控制台輸出 輸出結果 字 ...
  • 盒模型分為標準盒模型和怪異盒模型,他們的區別在於計算內容區的不同。 我們來看input標簽 這是一個高度相同,邊框相同的搜索框和一個提交按鈕。給他們寫長、相同的高和一個1px的邊框。 看 他們在網頁中顯示的高度現在看起來是不一樣的。不止高度不一樣提交按鈕的長度現在也不是和我定的數值一樣。這個提交按鈕 ...
  • 1 <div style="width: 50%"> 2 <table class="layui-table"> 3 <tbody> 4 <tr> 5 <td>Bud</td> 6 <td>179</td> 7 <td>183</td> 8 <td>44</td> 9 <td>37</td> 10 ...
  • 1、ThymeLeaf+LayUI表格渲染錯誤 使用thymeleafhe+layui渲染表格時,出現錯誤org.thymeleaf.exceptions.TemplateProcessingException: Could not parse as expression: 這是因為[[]]是thy ...
  • 將指定數字插入到數組的末尾,返回值為 將數組的第一個元素刪除並返回,返回值為 ...
  • 簡單繼承: @extend 繼承 編譯後 關聯屬性繼承: 編譯後 鏈式繼承: 編譯後 偽類繼承: 編譯後 sass嵌套 編譯後 相同的屬性值首碼,也可以用嵌套 編譯後 sass條件控制 @if @else if @else 編譯後 條件判斷語句也可以寫在外面 編譯後 迴圈 @for $i from ...
  • 對前端稍微有點瞭解的初學者都知道,JavaScript是必不可少的工具。毫不誇張的說,大部分網頁都使用了JavaScript,想要成為一個優秀的前端工程師,做出漂亮令用戶滿意的網頁,熟練掌握JavaScript是一個必備技能。本文為新手整理了一篇JavaScript基礎教程入門指南,希望可以幫助編程 ...
  • number類型 編譯後 color類型: 編譯後 string類型 編譯後 list數組類型 nth(list, num) 獲取list數組中的下標為num的元素 註意num下標是從1開始的 編譯後 index(list, str) 返回str在list數組中的下標 編譯後 map 數組類型(有點 ...
一周排行
    -Advertisement-
    Play Games
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...
  • 目錄前言PostgreSql安裝測試額外Nuget安裝Person.cs模擬運行Navicate連postgresql解決方案Garnet為什麼要選擇Garnet而不是RedisRedis不再開源Windows版的Redis是由微軟維護的Windows Redis版本老舊,後續可能不再更新Garne ...
  • C#TMS系統代碼-聯表報表學習 領導被裁了之後很快就有人上任了,幾乎是無縫銜接,很難讓我不想到這早就決定好了。我的職責沒有任何變化。感受下來這個系統封裝程度很高,我只要會調用方法就行。這個系統交付之後不會有太多問題,更多應該是做小需求,有大的開發任務應該也是第二期的事,嗯?怎麼感覺我變成運維了?而 ...
  • 我在隨筆《EAV模型(實體-屬性-值)的設計和低代碼的處理方案(1)》中介紹了一些基本的EAV模型設計知識和基於Winform場景下低代碼(或者說無代碼)的一些實現思路,在本篇隨筆中,我們來分析一下這種針對通用業務,且只需定義就能構建業務模塊存儲和界面的解決方案,其中的數據查詢處理的操作。 ...
  • 對某個遠程伺服器啟用和設置NTP服務(Windows系統) 打開註冊表 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer 將 Enabled 的值設置為 1,這將啟用NTP伺服器功 ...
  • title: Django信號與擴展:深入理解與實踐 date: 2024/5/15 22:40:52 updated: 2024/5/15 22:40:52 categories: 後端開發 tags: Django 信號 松耦合 觀察者 擴展 安全 性能 第一部分:Django信號基礎 Djan ...
  • 使用xadmin2遇到的問題&解決 環境配置: 使用的模塊版本: 關聯的包 Django 3.2.15 mysqlclient 2.2.4 xadmin 2.0.1 django-crispy-forms >= 1.6.0 django-import-export >= 0.5.1 django-r ...
  • 今天我打算整點兒不一樣的內容,通過之前學習的TransformerMap和LazyMap鏈,想搞點不一樣的,所以我關註了另外一條鏈DefaultedMap鏈,主要調用鏈為: 調用鏈詳細描述: ObjectInputStream.readObject() DefaultedMap.readObject ...
  • 後端應用級開發者該如何擁抱 AI GC?就是在這樣的一個大的浪潮下,我們的傳統的應用級開發者。我們該如何選擇職業或者是如何去快速轉型,跟上這樣的一個行業的一個浪潮? 0 AI金字塔模型 越往上它的整個難度就是職業機會也好,或者說是整個的這個運作也好,它的難度會越大,然後越往下機會就會越多,所以這是一 ...
  • @Autowired是Spring框架提供的註解,@Resource是Java EE 5規範提供的註解。 @Autowired預設按照類型自動裝配,而@Resource預設按照名稱自動裝配。 @Autowired支持@Qualifier註解來指定裝配哪一個具有相同類型的bean,而@Resourc... ...