HTTP協議 客戶機與服務端之間的數據交互需要遵守一定的約定,比如協議版本,數據類型,是否有緩存,是否有壓縮等,只有在這些約定的基礎上才能相互之間愉快的工作。 Netty通信過程中的編解碼 這時說的是基於TCP/IP的Netty之間的通信。TCP/IP協議下客戶端與服務端之間要進行數據交互,一般需要 ...
HTTP協議
客戶機與服務端之間的數據交互需要遵守一定的約定,比如協議版本,數據類型,是否有緩存,是否有壓縮等,只有在這些約定的基礎上才能相互之間愉快的工作。
Netty通信過程中的編解碼
這時說的是基於TCP/IP的Netty之間的通信。TCP/IP協議下客戶端與服務端之間要進行數據交互,一般需要將數據轉換成二進位格式,直接傳java bean是不能支持的。在RPC模式下客戶端在向服務端發起請求前需要將數據做編碼,服務端在接收客戶端發的數據後需要做解碼之後才能正常工作。
- 解碼流程
- 編碼流程
Netty 私有協議棧
為了更好的控制RPC客戶端與服務端之間的通信,也可以編寫私有的協議棧來支撐。
定義消息體
類似HTTP協議,包含頭信息以及內容信息。
public class RpcMessage implements Serializable {
private RpcMessageHeader messageHeader;
private Object messageBody;
}
頭信息,包含內容體長度,消息類型等信息。可以根據消息類型來做不同的業務,比如區分是心跳信息還是業務或者是監控之類的信息。
public class RpcMessageHeader implements Serializable {
private int length;
private int type;
}
定義解碼器
因為TCP/IP協議容易出現粘包拆包現象,這裡為了簡單直接選擇繼承組件提供的LengthFieldBasedFrameDecoder,只需要重寫下麵的方法即可:
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame=(ByteBuf)super.decode(ctx,in);
if(null==frame){
return null;
}
RpcMessage message=new RpcMessage();
RpcMessageHeader messageHeader=new RpcMessageHeader();
messageHeader.setLength(frame.readInt());
message.setMessageHeader(messageHeader);
byte[] data = new byte[message.getMessageHeader().getLength()];
frame.readBytes(data);
Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass);
message.setMessageBody(obj);
return message;
}
定義編碼器
編碼器繼承MessageToByteEncoder,將對象轉換成位元組的編碼器
public class RpcEncoder extends MessageToByteEncoder<RpcMessage>
重點是下麵的編碼函數,在ByteBuf中輸出數據長度以及數據體,如有其它需要可以補充其它的欄位,比如消息類型。
public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception {
if(null==in){
throw new RpcException("RpcMessage is null");
}
if (genericClass.isInstance(in.getMessageBody())) {
byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody());
out.writeInt(data.length);
out.writeBytes(data);
}
}
ServerHandle
- 修改服務端執行器消息實體類型為新定義的RpcMessage
public class RpcServerInvoker extends AbstractInvoker<RpcMessage>
- 修改服務端回調
從服務端方法獲取到返回的結果後,重新封裝成消息對象(RpcMessage)發送給客戶端。
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) {
this.executor.execute(new Runnable() {
@Override
public void run() {
RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody()));
RpcMessage responseMessage=new RpcMessage();
byte[] data = ProtoStuffSerializeUtil.serialize(response);
RpcMessageHeader messageHeader=new RpcMessageHeader();
messageHeader.setLength(data.length);
responseMessage.setMessageHeader(messageHeader);
responseMessage.setMessageBody(response);
channelHandlerContext.writeAndFlush(responseMessage);
}
});
}
ClientHandle
- 修改客戶端執行器消息實體類型為新定義的RpcMessage
public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
- 修改客戶端回調方法
接收的返回結果修改為RpcMessage,從body屬性中獲取原來的RpcResponse對象
public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) {
RpcResponse response=(RpcResponse) message.getMessageBody();
String requestId = response.getRequestId();
ResponseFuture responseFuture = pendingRPC.get(requestId);
if (responseFuture != null) {
pendingRPC.remove(requestId);
responseFuture.done(response);
}
}
- 修改發送請求的消息對象,組裝成RpcMessage發送
public ResponseFuture invoke(RpcInvocation invocation) {
RpcRequest request=this.getRpcRequest();
ResponseFuture responseFuture = new ResponseFuture(request);
pendingRPC.put(request.getRequestId(), responseFuture);
RpcMessage message=new RpcMessage();
byte[] data = ProtoStuffSerializeUtil.serialize(request);
RpcMessageHeader messageHeader=new RpcMessageHeader();
messageHeader.setLength(data.length);
message.setMessageHeader(messageHeader);
message.setMessageBody(request);
channel.writeAndFlush(message);
return responseFuture;
}
本文源碼
https://github.com/jiangmin168168/jim-framework
文中代碼是依賴上述項目的,如果有不明白的可下載源碼
引用
- 文中插圖來自來網路
- 文中的思路參考了Netty權威指南