電腦組成原理 哈工大 劉巨集偉 b站課程地址:https://www.bilibili.com/video/BV1t4411e7LH/?spm_id_from=333.337.search-card.all.click 編譯原理 哈工大 陳鄞 b站課程地址:https://www.bilibili. ...
前言
本周繼續學習尼恩編著的《Netty、Redis、ZooKeeper高併發實戰》,一些資源也貼在這裡,自己以後想看還可以找到,這個是在博客園的一個入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
這周主要學習了Netty客戶端和服務端通信,書是由淺入深的在進行,從Socket NOI通信到 Reactor反應器模式,再到Netty框架,示例代碼都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,書結合源代碼,自己在動手試驗一下,感覺還是有些收穫。 今天的示例代碼就是實踐出一個客戶端和服務端傳遞protobuf的例子。
Netty 服務端
先來看一下服務端代碼,
@Slf4j
public class ProtoBufServer {
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public ProtoBufServer(int port) {
this.serverPort = port;
}
public void runServer() {
//創建reactor 線程組
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 設置reactor 線程組
b.group(bossLoopGroup, workerLoopGroup);
//2 設置nio類型的channel
b.channel(NioServerSocketChannel.class);
//3 設置監聽埠
b.localAddress(serverPort);
//4 設置通道的參數
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 裝配子通道流水線
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有連接到達時會創建一個channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水線添加3個handler處理器
// protobufDecoder僅僅負責編碼,並不支持讀半包,所以在之前,一定要有讀半包的處理器。
// 有三種方式可以選擇:
// 使用netty提供ProtobufVarint32FrameDecoder
// 繼承netty提供的通用半包處理器 LengthFieldBasedFrameDecoder
// 繼承ByteToMessageDecoder類,自己處理半包
// 半包的處理
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
// 需要解碼的目標類
ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufBussinessHandler());
}
});
// 6 開始綁定server
// 通過調用sync同步方法阻塞直到綁定成功
ChannelFuture channelFuture = b.bind().sync();
log.info(" 伺服器啟動成功,監聽埠: " +
channelFuture.channel().localAddress());
// 7 等待通道關閉的非同步任務結束
// 服務監聽通道會一直等待通道關閉的非同步任務結束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 優雅關閉EventLoopGroup,
// 釋放掉所有資源包括創建的線程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
//伺服器端業務處理器
static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
//經過pipeline的各個decoder,到此Person類型已經可以斷定
log.info("收到一個 MsgProtos.Msg 數據包 =》");
log.info("protoMsg.getId():=" + protoMsg.getId());
log.info("protoMsg.getClose():=" + protoMsg.getClose());
}
}
public static void main(String[] args) throws InterruptedException {
int port = SERVER_PORT;
new ProtoBufServer(port).runServer();
}
}
代碼中有註釋解釋,我在這裡加一下說明
代碼中有兩個EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用兩個是什麼原因呢? 一個是負責處理連接監聽事件, 一個負責處理數據IO事件和Handler業務處理,通俗點解釋就是一個負責接客,一個負責服務客戶。如果只有一個人就會忙不過來,讓後面的人等很久。
b.childHandler這個就是我們具體的如何處理接收到的消息,他們都繼承ChannelInboundHandlerAdapter,通過PipeLine把消息進行處理。我們從通道裡面拿到的都是位元組碼,那麼要轉成我們需要的Protobuf類,就需要用到這些處理類
前兩個處理類ProtobufVarint32FrameDecoder和ProtobufDecoder都是Netty提供的,一個是為瞭解決半包問題,半包問題是因為在TCP傳輸的時候對數據包進行了拆包或者分包,收到的時候如果直接處理,就會有問題,需要我們在應用層進行二次封裝。 在這個示例中如果我們不使用ProtobufVarint32FrameDecoder,客戶端也不用,那麼就會出現有的可以解析出來,有的報錯的情況: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
可以用搬家來打比分,我們把一個家從一個地方搬到另外一個地方,還要求佈局一樣,汽車運輸的時候可能要分好幾次,那麼我們可以先記住位置,然後隨便裝車搬過去,過去後先暫存,再按記錄的位置進行還原,這樣來保證一模一樣。
ProtobufBussinessHandler 是我們自定義的Handler繼承了ChannelInboundHandlerAdapter,通過它我們拿到channel裡面的數據,轉換成我們的具體的Protobuf對象。這裡只是簡單的列印出來。 如果要把它繼續傳下去,需要調用 super.channelRead(ctx,msg)傳遞下去。
客戶端
@Slf4j
public class ProtoBufScanClient {
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public ProtoBufScanClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}
public void runClient() {
//創建reactor 線程組
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 設置reactor 線程組
b.group(workerLoopGroup);
//2 設置nio類型的channel
b.channel(NioSocketChannel.class);
//3 設置監聽埠
b.remoteAddress(serverIp, serverPort);
//4 設置通道的參數
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 裝配通道流水線
b.handler(new ChannelInitializer<SocketChannel>() {
//初始化客戶端channel
protected void initChannel(SocketChannel ch) throws Exception {
// 客戶端channel流水線添加2個handler處理器
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess()) {
log.info("EchoClient客戶端連接成功!");
} else {
log.info("EchoClient客戶端連接失敗!");
}
});
// 阻塞,直到連接完成
f.sync();
Channel channel = f.channel();
Scanner scanner = new Scanner(System.in);
log.info("請輸入發送內容:");
GenericFutureListener sendCallBack = new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
log.info("發送成功!");
} else {
log.info("發送失敗!");
}
}
};
while (scanner.hasNext()) {
//獲取輸入的內容
String next = scanner.next();
String[] values = next.split(",");
if(values.length != 5)
{
log.info("格式不正確!");
}
else {
MarketPriceProto.MarketPrice msg = build(values);
ChannelFuture writeAndFlushFuture = channel.writeAndFlush(msg);
writeAndFlushFuture.addListener(sendCallBack);
}
log.info("請輸入發送內容:");
}
channel.flush();
// 7 等待通道關閉的非同步任務結束
// 服務監聽通道會一直等待通道關閉的非同步任務結束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 優雅關閉EventLoopGroup,
// 釋放掉所有資源包括創建的線程
workerLoopGroup.shutdownGracefully();
}
}
//構建ProtoBuf對象
public MarketPriceProto.MarketPrice build(String[] values) {
MarketPriceProto.MarketPrice.Builder builder = MarketPriceProto.MarketPrice.newBuilder();
builder.setId(values[0]);
builder.setOpen(Double.valueOf(values[1]));
builder.setHigh(Double.valueOf(values[2]));
builder.setLow(Double.valueOf(values[3]));
builder.setClose(Double.valueOf(values[4]));
return builder.build();
}
public static void main(String[] args) throws InterruptedException {
int port = SERVER_PORT;
String ip = SOCKET_SERVER_IP;
new ProtoBufScanClient(ip, port).runClient();
}
}
Netty客戶端代碼和服務端很接近,這裡它只用了一個線程組,客戶端只有它一個使用,和服務端模式不一樣,一個就夠了。
客戶端讀取控制台輸入數據,然後構造成MarketPriceProto.MarketPrice,它的Proto定義如下
// [開始聲明]
syntax = "proto3";
//定義protobuf的包名稱空間
package com.ken.netty.protocol;
// [結束聲明]
// [開始 java 選項配置]
option java_package = "com.ken.netty.protocol";
option java_outer_classname = "MarketPriceProto";
// [結束 java 選項配置]
// [開始 消息定義]
message MarketPrice {
string id = 1;
double open = 2;
double high = 3;
double low = 4;
double close = 5;
}
channel.writeAndFlush(msg), 這裡我們直接把Protobuf對象傳遞進channel, ProtobufEncoder會對它進行編碼。
總結
代碼比較簡單,這是在有書的幫助和源代碼幫助下實現的,不通過書這個也不是這麼容易理解的。 作者還實現了一個及時通信的例子https://gitee.com/crazymaker/SimpleCrayIM。 也需要花些時間來學習一下。