當網路中兩個進程需要通信時,我們往往會使用 來實現。 都不陌生。當三次握手成功後,客戶端與服務端就能通信,並且,彼此之間通信的數據包格式都是二進位,由 協議負責傳輸。 當客戶端和服務端取得了二進位數據包後,我們往往需要『萃取』出想要的數據,這樣才能更好的執行業務邏輯。所以,我們需要定義好數據結構來描 ...
當網路中兩個進程需要通信時,我們往往會使用 Socket
來實現。Socket
都不陌生。當三次握手成功後,客戶端與服務端就能通信,並且,彼此之間通信的數據包格式都是二進位,由 TCP/IP
協議負責傳輸。
當客戶端和服務端取得了二進位數據包後,我們往往需要『萃取』出想要的數據,這樣才能更好的執行業務邏輯。所以,我們需要定義好數據結構來描述這些二進位數據的格式,這就是通信網路協議。簡單講,就是需要約定好二進位數據包中每一段位元組的含義,比如從第 n 位元組開始的 m 長度是核心數據,有了這樣的約定後,我們就能解碼出想要的數據,執行業務邏輯,這樣我們就能暢通無阻的通信了。
網路協議的設計
概要劃分
一個最基本的網路協議必須包含
- 數據的長度
- 數據
瞭解 TCP
協議的同學一定聽說過粘包、拆包
這兩個術語。因為TCP
協議是數據流協議,它的底層根據二進位緩衝區的實際情況進行包的劃分。所以,不可避免的會出現粘包,拆包
現象 。為瞭解決它們,我們的網路協議往往會使用一個 4 位元組的 int
類型來表示數據的大小。比如,Netty
就為我們提供了 LengthFieldBasedFrameDecoder
解碼器,它可以有效的使用自定義長度幀來解決上述問題。
同時一個好的網路協議,還會將動作和業務數據分離。試想一下, HTTP
協議的分為請求頭,請求體——
- 請求頭:定義了介面地址、
Http Method
、HTTP
版本 - 請求體:定義了需要傳遞的數據
這就是一種分離關註點的思想。所以自定義的網路協議也可以包含:
- 動作指令:比如定義
code
來分門別類的代表不同的業務邏輯 - 序列化演算法:描述了
JAVA
對象和二進位之間轉換的形式,提供多種序列化/反序列化方式。比如json
、protobuf
等等,甚至是自定義演算法。比如:rocketmq
等等。
同時,協議的開頭可以定義一個約定的魔數
。這個固定值(4位元組),一般用來判斷當前的數據包是否合法。比如,當我們使用 telnet
發送錯誤的數據包時,很顯然,它不合法,會導致解碼失敗。所以,為了減輕伺服器的壓力,我們可以取出數據包的前4
個位元組與固定的魔數
對比,如果是非法的格式,直接關閉連接,不繼續解碼。
網路協議結構如下所示:
+--------------+-----------+------------+-----------+----------+
| 魔數(4) | code(1) |序列化演算法(1) |數據長度(4) |數據(n) |
+--------------+-----------+------------+-----------+----------+
RocketMQ 通信網路協議的實現
RocketMQ 網路協議
這一小節,我們從RocketMQ
中,分析優秀通信網路協議的實現。RocketMQ
項目中,客戶端和服務端的通信是基於 Netty 之上構建的。同時,為了更加有效的通信,往往需要對發送的消息自定義網路協議。
RocketMQ
的網路協議,從數據分類的角度上看,可分為兩大類
- 消息頭數據(Header Data)
- 消息體數據(Body Data)
從左到右
-
第一段:4 個位元組整數,等於2、3、4 長度總和
-
第二段:4 個位元組整數,等於3 的長度。特別的
byte[0]
代表序列化演算法,byte[1~3]
才是真正的長度 -
第三段:代表消息頭數據,結構如下
{
"code":0,
"language":"JAVA",
"version":0,
"opaque":0,
"flag":1,
"remark":"hello, I am respponse /127.0.0.1:27603",
"extFields":{
"count":"0",
"messageTitle":"HelloMessageTitle"
}
}
- 第四段:代表消息體數據
RocketMQ 消息頭協議詳細如下:
Header 欄位名 | 類型 | Request | Response |
---|---|---|---|
code | 整數 | 請求操作代碼,請求接收方根據不同的代碼做不同的操作 | 應答結果代碼,0表示成功,非0表示各種錯誤代碼 |
language | 字元串 | 請求發起方實現語言,預設JAVA | 應答接收方實現語言 |
version | 整數 | 請求發起方程式版本 | 應答接收方程式版本 |
opaque | 整數 | 請求發起方在同一連接上不同的請求標識代碼,多線程連接復用使用 | 應答方不做修改,直接返回 |
flag | 整數 | 通信層的標誌位 | 通信層的標誌位 |
remark | 字元串 | 傳輸自定義文本信息 | 錯誤詳細描述信息 |
extFields | HashMap<String,String> | 請求自定義欄位 | 應答自定義欄位 |
編碼過程
RocketMQ
的通信模塊是基於 Netty
的。通過定義 NettyEncoder
來實現對每一個 Channel
的 出棧數據進行編碼,如下所示:
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
...
}
}
}
其中,核心的編碼過程位於 RemotingCommand
對象中,encodeHeader
階段,需要統計出消息總長度,即:
-
定義消息頭長度,一個整數表示:占4個位元組
-
定義消息頭數據,並計算其長度
-
定義消息體數據,並計算其長度
-
額外再加 4是因為需要加入消息總長度,一個整數表示:占4個位元組
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> 消息頭長度,一個整數表示:占4個位元組
int length = 4;
// 2> 消息頭數據
byte[] headerData;
headerData = this.headerEncode();
// 再加消息頭數據長度
length += headerData.length;
// 3> 再加消息體數據長度
length += bodyLength;
// 4> 額外加 4是因為需要加入消息總長度,一個整數表示:占4個位元組
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 5> 將消息總長度加入 ByteBuffer
result.putInt(length);
// 6> 將消息的頭長度加入 ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 7> 將消息頭數據加入 ByteBuffer
result.put(headerData);
result.flip();
return result;
}
其中,encode
階段會將 CommandCustomHeader
數據轉換 HashMap<String,String>
,方便序列化
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
特別的,消息頭序列化支持兩種演算法:
JSON
RocketMQ
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
這兒需要值得註意的是,encode
階段將當前 RPC
類型和 headerData
長度編碼到一個 byte[4]
數組中,byte[0]
位序列化類型。
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
其中,通過與運算 & 0xFF
取低八位數據。
所以, 最終 length
長度等於序列化類型 + header length + header data + body data 的位元組的長度。
解碼過程
RocketMQ
解碼通過NettyDecoder
來實現,它繼承自 LengthFieldBasedFrameDecoder
,其中調用了父類LengthFieldBasedFrameDecoder
的構造函數
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
這些參數設置4
個位元組代表 length
總長度,同時解碼時跳過最開始的4
個位元組:
frame = (ByteBuf) super.decode(ctx, in);
所以,得到的 frame
= 序列化類型 + header length + header data + body data 。解碼如下所示:
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//總長度
int length = byteBuffer.limit();
//原始的 header length,4位
int oriHeaderLen = byteBuffer.getInt();
//真正的 header data 長度。忽略 byte[0]的 serializeType
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
其中,getProtocolType
,右移 24
位,拿到 serializeType
:
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
getHeaderLength
拿到 0-24 位代表的 headerData
length:
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
小結
對於諸多中間件而言,底層的網路通信模塊往往會使用 Netty
。Netty
提供了諸多的編解碼器,可以快速方便的上手。本文從如何設計一個網路協議入手,最終切入到 RocketMQ
底層網路協議的實現。可以看到,它並不複雜。仔細研讀幾遍變能理解其奧義。具體參考類NettyEncoder
、NettyDecoder
、RemotingCommand
。