前言 本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行數據傳輸的相關內容。Protobuf會簡單的介紹下用法,至於Netty在 "之前的文章" 中已經簡單的介紹過了,這裡就不再過多細說了。 Protobuf 介紹 protocolbuffer(以下簡稱PB)是goo ...
前言
本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行數據傳輸的相關內容。Protobuf會簡單的介紹下用法,至於Netty在之前的文章中已經簡單的介紹過了,這裡就不再過多細說了。
Protobuf
介紹
protocolbuffer(以下簡稱PB)是google 的一種數據交換的格式,它獨立於語言,獨立於平臺。google 提供了多種語言的實現:java、c#、c++、go 和python,每一種實現都包含了相應語言的編譯器以及庫文件。由於它是一種二進位的格式,比使用 xml進行數據交換快許多。可以把它用於分散式應用之間的數據通信或者異構環境下的數據交換。作為一種效率和相容性都很優秀的二進位數據傳輸格式,可以用於諸如網路傳輸、配置文件、數據存儲等諸多領域。
官方地址: https://github.com/google/protobuf
使用
這裡的使用就只介紹Java相關的使用。
首先我們需要建立一個proto文件,在該文件定義我們需要傳輸的文件。
例如我們需要定義一個用戶的信息,包含的欄位主要有編號、名稱、年齡。
那麼該protobuf文件的格式如下:
註:這裡使用的是proto3,相關的註釋我已寫了,這裡便不再過多講述了。需要註意一點的是proto文件和生成的Java文件名稱不能一致!
syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";
message UserMsg {
// ID
int32 id = 1;
// 姓名
string name = 2;
// 年齡
int32 age = 3;
// 狀態
int32 state = 4;
}
創建好該文件之後,我們把該文件和protoc.exe(生成Java文件的軟體)放到E盤目錄下的protobuf文件夾下,然後再到該目錄的dos界面下輸入:protoc.exe --java_out=文件絕對路徑名稱
。
例如:
protoc.exe --java_out=E:\protobuf User.proto
輸入完之後,回車即可在同級目錄看到已經生成好的Java文件,然後將該文件放到項目中該文件指定的路徑下即可。
註:生成protobuf的文件軟體和測試的protobuf文件我也整合到該項目中了,可以直接獲取的。
Java文件生成好之後,我們再來看怎麼使用。
這裡我就直接貼代碼了,並且將註釋寫在代碼中,應該更容易理解些吧。。。
代碼示例:
// 按照定義的數據結構,創建一個對象
UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();
userInfo.setId(1);
userInfo.setName("xuwujing");
userInfo.setAge(18);
UserInfo.UserMsg userMsg = userInfo.build();
// 將數據寫到輸出流
ByteArrayOutputStream output = new ByteArrayOutputStream();
userMsg.writeTo(output);
// 將數據序列化後發送
byte[] byteArray = output.toByteArray();
// 接收到流並讀取
ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
// 反序列化
UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);
System.out.println("id:" + userInfo2.getId());
System.out.println("name:" + userInfo2.getName());
System.out.println("age:" + userInfo2.getAge());
註:這裡說明一點,因為protobuf是通過二進位進行傳輸,所以需要註意下相應的編碼。還有使用protobuf也需要註意一下一次傳輸的最大位元組長度。
輸出結果:
id:1
name:xuwujing
age:18
SpringBoot整合Netty
說明:如果想直接獲取工程那麼可以直接跳到底部,通過鏈接下載工程代碼。
開發準備
環境要求
JDK::1.8
Netty::4.0或以上(不包括5)
Protobuf:3.0或以上
如果對Netty不熟的話,可以看看我之前寫的一些文章。大神請無視~。~
地址:https://blog.csdn.net/column/details/17640.html
首先還是Maven的相關依賴:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<netty.version>4.1.22.Final</netty.version>
<protobuf.version>3.5.1</protobuf.version>
<springboot>1.5.9.RELEASE</springboot>
<fastjson>1.2.41</fastjson>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<version>${springboot}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
添加了相應的maven依賴之後,配置文件這塊暫時沒有什麼可以添加的,因為暫時就一個監聽的埠而已。
代碼編寫
代碼模塊主要分為服務端和客戶端。
主要實現的業務邏輯:
服務端啟動成功之後,客戶端也啟動成功,這時服務端會發送一條protobuf格式的信息給客戶端,然後客戶端給予相應的應答。客戶端與服務端連接成功之後,客戶端每個一段時間會發送心跳指令給服務端,告訴服務端該客戶端還存過中,如果客戶端沒有在指定的時間發送信息,服務端會關閉與該客戶端的連接。當客戶端無法連接到服務端之後,會每隔一段時間去嘗試重連,只到重連成功!
服務端
首先是編寫服務端的啟動類,相應的註釋在代碼中寫得很詳細了,這裡也不再過多講述了。不過需要註意的是,在之前的我寫的Netty文章中,是通過main方法直接啟動服務端,因此是直接new一個對象的。而在和SpringBoot整合之後,我們需要將Netty交給springBoot去管理,所以這裡就用了相應的註解。
代碼如下:
@Service("nettyServer")
public class NettyServer {
private static final int port = 9876; // 設置服務端埠
private static EventLoopGroup boss = new NioEventLoopGroup(); // 通過nio方式來接收連接和處理連接
private static EventLoopGroup work = new NioEventLoopGroup(); // 通過nio方式來接收連接和處理連接
private static ServerBootstrap b = new ServerBootstrap();
@Autowired
private NettyServerFilter nettyServerFilter;
public void run() {
try {
b.group(boss, work);
b.channel(NioServerSocketChannel.class);
b.childHandler(nettyServerFilter); // 設置過濾器
// 伺服器綁定埠監聽
ChannelFuture f = b.bind(port).sync();
System.out.println("服務端啟動成功,埠是:" + port);
// 監聽伺服器關閉監聽
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 關閉EventLoopGroup,釋放掉所有資源包括創建的線程
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
服務端主類編寫完畢之後,我們再來設置下相應的過濾條件。
這裡需要繼承Netty中ChannelInitializer類,然後重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協議設置,以及相應的業務實現類。
代碼如下:
@Component
public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
//入參說明: 讀超時時間、寫超時時間、所有類型的超時時間、時間格式
ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// 解碼和編碼,應和客戶端一致
//傳輸的協議 Protobuf
ph.addLast(new ProtobufVarint32FrameDecoder());
ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
ph.addLast(new ProtobufVarint32LengthFieldPrepender());
ph.addLast(new ProtobufEncoder());
//業務邏輯實現類
ph.addLast("nettyServerHandler", nettyServerHandler);
}
}
服務相關的設置的代碼寫完之後,我們再來編寫主要的業務代碼。
使用Netty編寫業務層的代碼,我們需要繼承ChannelInboundHandlerAdapter 或SimpleChannelInboundHandler類,在這裡順便說下它們兩的區別吧。
繼承SimpleChannelInboundHandler類之後,會在接收到數據後會自動release掉數據占用的Bytebuffer資源。並且繼承該類需要指定數據格式。
而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動調用ReferenceCountUtil.release()等方法進行釋放。繼承該類不需要指定數據格式。
所以在這裡,個人推薦服務端繼承ChannelInboundHandlerAdapter,手動進行釋放,防止數據未處理完就自動釋放了。而且服務端可能有多個客戶端進行連接,並且每一個客戶端請求的數據格式都不一致,這時便可以進行相應的處理。
客戶端根據情況可以繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸的數據格式,就不需要再進行格式的轉換了。
代碼如下:
@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** 空閑次數 */
private int idle_count = 1;
/** 發送次數 */
private int count = 1;
/**
* 建立連接時,發送一條消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連接的客戶端地址:" + ctx.channel().remoteAddress());
UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
.build();
ctx.writeAndFlush(userMsg);
super.channelActive(ctx);
}
/**
* 超時處理 如果5秒沒有接受客戶端的心跳,就觸發; 如果超過兩次,則直接關閉;
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.READER_IDLE.equals(event.state())) { // 如果讀通道處於空閑狀態,說明沒有接收到心跳命令
System.out.println("已經5秒沒有接收到客戶端的信息了");
if (idle_count > 1) {
System.out.println("關閉這個不活躍的channel");
ctx.channel().close();
}
idle_count++;
}
} else {
super.userEventTriggered(ctx, obj);
}
}
/**
* 業務邏輯處理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("第" + count + "次" + ",服務端接受的消息:" + msg);
try {
// 如果是protobuf類型的數據
if (msg instanceof UserMsg) {
UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
if (userState.getState() == 1) {
System.out.println("客戶端業務處理成功!");
} else if(userState.getState() == 2){
System.out.println("接受到客戶端發送的心跳!");
}else{
System.out.println("未知命令!");
}
} else {
System.out.println("未知數據!" + msg);
return;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
count++;
}
/**
* 異常處理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
還有個服務端的啟動類,之前是通過main方法直接啟動, 不過這裡改成了通過springBoot進行啟動,差別不大。
代碼如下:
@SpringBootApplication
public class NettyServerApp {
public static void main(String[] args) {
// 啟動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件
ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
nettyServer.run();
}
}
到這裡服務端相應的代碼就編寫完畢了。
客戶端
客戶端這邊的代碼和服務端的很多地方都類似,我就不再過多細說了,主要將一些不同的代碼拿出來簡單的講述下。
首先是客戶端的主類,基本和服務端的差不多,也就是多了監聽的埠和一個監聽器(用來監聽是否和服務端斷開連接,用於重連)。
主要實現的代碼邏輯如下:
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
ChannelFuture f = null;
try {
if (bootstrap != null) {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(nettyClientFilter);
bootstrap.remoteAddress(host, port);
f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
System.out.println("與服務端斷開連接!在10s之後準備嘗試重連!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
}
});
if(initFalg){
System.out.println("Netty客戶端啟動成功!");
initFalg=false;
}
// 阻塞
f.channel().closeFuture().sync();
}
} catch (Exception e) {
System.out.println("客戶端連接失敗!"+e.getMessage());
}
}
註:監聽器這塊的實現用的是JDK1.8的寫法。
客戶端過濾其這塊基本和服務端一直。不過需要註意的是,傳輸協議、編碼和解碼應該一致,還有心跳的讀寫時間應該小於服務端所設置的時間。
改動的代碼如下:
ChannelPipeline ph = ch.pipeline();
/*
* 解碼和編碼,應和服務端一致
* */
//入參說明: 讀超時時間、寫超時時間、所有類型的超時時間、時間格式
ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
客戶端的業務代碼邏輯。
主要實現的幾點邏輯是心跳按時發送以及解析服務發送的protobuf格式的數據。
這裡比服務端多個個註解, 該註解Sharable主要是為了多個handler可以被多個channel安全地共用,也就是保證線程安全。
廢話就不多說了,代碼如下:
@Service("nettyClientHandler")
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
/** 迴圈次數 */
private int fcount = 1;
/**
* 建立連接時
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("建立連接時:" + new Date());
ctx.fireChannelActive();
}
/**
* 關閉連接時
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("關閉連接時:" + new Date());
final EventLoop eventLoop = ctx.channel().eventLoop();
nettyClient.doConnect(new Bootstrap(), eventLoop);
super.channelInactive(ctx);
}
/**
* 心跳請求處理 每4秒發送一次心跳請求;
*
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
System.out.println("迴圈請求的時間:" + new Date() + ",次數" + fcount);
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果寫通道處於空閑狀態,就發送心跳命令
UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
ctx.channel().writeAndFlush(userState);
fcount++;
}
}
}
/**
* 業務邏輯處理
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果不是protobuf類型的數據
if (!(msg instanceof UserMsg)) {
System.out.println("未知數據!" + msg);
return;
}
try {
// 得到protobuf的數據
UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
// 進行相應的業務處理。。。
// 這裡就從簡了,只是列印而已
System.out.println(
"客戶端接受到的用戶信息。編號:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年齡:" + userMsg.getAge());
// 這裡返回一個已經接受到數據的狀態
UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
ctx.writeAndFlush(userState);
System.out.println("成功發送給服務端!");
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
}
那麼到這裡客戶端的代碼也編寫完畢了。
功能測試
首先啟動服務端,然後再啟動客戶端。
我們來看看結果是否如上述所說。
服務端輸出結果:
服務端啟動成功,埠是:9876
連接的客戶端地址:/127.0.0.1:53319
第1次,服務端接受的消息:state: 1
客戶端業務處理成功!
第2次,服務端接受的消息:state: 2
接受到客戶端發送的心跳!
第3次,服務端接受的消息:state: 2
接受到客戶端發送的心跳!
第4次,服務端接受的消息:state: 2
接受到客戶端發送的心跳!
客戶端輸入結果:
Netty客戶端啟動成功!
建立連接時:Mon Jul 16 23:31:58 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發送給服務端!
迴圈請求的時間:Mon Jul 16 23:32:02 CST 2018,次數1
迴圈請求的時間:Mon Jul 16 23:32:06 CST 2018,次數2
迴圈請求的時間:Mon Jul 16 23:32:10 CST 2018,次數3
迴圈請求的時間:Mon Jul 16 23:32:14 CST 2018,次數4
通過列印信息可以看出如上述所說。
接下來我們再來看看客戶端是否能夠實現重連。
先啟動客戶端,再啟動服務端。
客戶端輸入結果:
Netty客戶端啟動成功!
與服務端斷開連接!在10s之後準備嘗試重連!
客戶端連接失敗!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立連接時:Mon Jul 16 23:41:33 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發送給服務端!
迴圈請求的時間:Mon Jul 16 23:41:38 CST 2018,次數1
迴圈請求的時間:Mon Jul 16 23:41:42 CST 2018,次數2
迴圈請求的時間:Mon Jul 16 23:41:46 CST 2018,次數3
服務端輸出結果:
服務端啟動成功,埠是:9876
連接的客戶端地址:/127.0.0.1:53492
第1次,服務端接受的消息:state: 1
客戶端業務處理成功!
第2次,服務端接受的消息:state: 2
接受到客戶端發送的心跳!
第3次,服務端接受的消息:state: 2
接受到客戶端發送的心跳!
第4次,服務端接受的消息:state: 2
結果也如上述所說!
其它
關於SpringBoot整合Netty使用Protobuf進行數據傳輸到這裡就結束了。
SpringBoot整合Netty使用Protobuf進行數據傳輸的項目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
對了,也有不使用springBoot整合的Netty項目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
原創不易,如果感覺不錯,希望給個推薦!您的支持是我寫作的最大動力!
版權聲明:
作者:虛無境
博客園出處:http://www.cnblogs.com/xuwujing
CSDN出處:http://blog.csdn.net/qazwsxpcm
個人博客出處:http://www.panchengming.com