1、netty如何解析多協議 前提: 項目地址:https://gitee.com/q529075990qqcom/NB-IOT.git 我們需要一個創建mavne項目,這個項目是我已經寫好的項目,項目結構圖如下: 創建公共模塊 創建子模塊,準備好依賴Netty4.1版本 <dependencies ...
1、netty如何解析多協議
前提:
項目地址:https://gitee.com/q529075990qqcom/NB-IOT.git
我們需要一個創建mavne項目,這個項目是我已經寫好的項目,項目結構圖如下:
創建公共模塊
創建子模塊,準備好依賴Netty4.1版本
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.72.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.28</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.28</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>5.3.0</version> </dependency> </dependencies>maven依賴
序列化的定義是:將一個對象編碼成一個位元組流(I/O);而與之相反的操作被稱為反序列化。
package serializer; /** * @description: * @author: quliang * @create: 2022-10-20 15:16 **/ public interface Serializer { /** * 序列化 * * @param obj * @return * @throws Exception */ byte[] serialize(Object obj) throws Exception; /** * 反序列化 * * @param bytes * @param clazz * @param <T> * @return * @throws Exception */ <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception; }自定義序列化介面
package serializer; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @description: * @author: quliang * @create: 2022-10-20 15:18 **/ public class KryoSerializer implements Serializer { private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.setReferences(true); kryo.setRegistrationRequired(false); ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; }); @Override public byte[] serialize(Object obj) throws Exception { try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { Output output = new Output(baos); Kryo kryo = kryoThreadLocal.get(); kryo.writeObject(output, obj); kryoThreadLocal.remove(); return output.toBytes(); } catch (IOException e) { throw new Exception("序列化失敗", e); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception { try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { Input input = new Input(bais); Kryo kryo = kryoThreadLocal.get(); Object obj = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return clazz.cast(obj); } catch (IOException e) { throw new Exception("反序化失敗"); } } }Kryo實現序列化介面
我們需要解析兩種協議,那我們就要提前定義好兩種協議,分別是消息協議、登錄協議
消息協議相關
package protocol.msg; import lombok.Data; import lombok.Getter; /** * @description: 消息協議: |magic|version|data| * @author: quliang * @create: 2022-12-10 20:46 **/ @Data public class MsgProtocol { @Getter private byte magic=0; @Getter private byte version=1; }消息協議基類
package protocol.msg.request; import lombok.Data; import protocol.msg.MsgProtocol; /** * @description: * @author: quliang * @create: 2022-12-10 20:58 **/ @Data public class MsgRequest extends MsgProtocol { private String msg; }消息請求子類
package protocol.msg.response; import lombok.Data; import protocol.msg.MsgProtocol; /** * @description: * @author: quliang * @create: 2022-12-10 20:41 **/ @Data public class MsgResponse extends MsgProtocol { private int statCode; }消息響應子類
package encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import protocol.msg.MsgProtocol; import serializer.KryoSerializer; /** * @description: * @author: quliang * @create: 2022-12-10 20:53 **/ public class MsgEncoder extends MessageToByteEncoder<MsgProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MsgProtocol msgProtocol, ByteBuf in) throws Exception { in.writeByte(msgProtocol.getMagic()); // in.writeByte(msgProtocol.code()); in.writeByte(msgProtocol.getVersion()); byte[] data = new KryoSerializer().serialize(msgProtocol); in.writeShort(data.length); in.writeBytes(data); } }消息協議編碼
package decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import protocol.msg.MsgProtocol; import serializer.KryoSerializer; import java.util.List; /** * @description: * @author: quliang * @create: 2022-12-10 20:52 **/ @Slf4j public class MsgDecoder extends ByteToMessageDecoder { private Class<MsgProtocol> msgClass; public MsgDecoder(Class clazz) { this.msgClass = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { byte magic = in.readByte(); byte version = in.readByte(); short dataSize = in.readShort(); byte[] data = new byte[dataSize]; in.readBytes(data); MsgProtocol baseProtocol = new KryoSerializer().deserialize(data, msgClass); out.add(baseProtocol); } catch (Exception e) { //如果解碼錯誤,將數據傳遞到下一個解碼器中 log.error("msg decoder {}",e.getMessage()); // 重置讀取位元組索引,因為上邊已經讀了(readBytes),不加這個會導致數據為空 in.resetReaderIndex(); // 這裡是複製流,複製一份,防止skipBytes跳過,導致傳遞的消息變成空; ByteBuf buff = in.retainedDuplicate(); //原因是netty不允許有位元組內容不讀的情況發生,所以採用下邊的方法解決。 in.skipBytes(in.readableBytes()); //繼續傳遞到下一個解碼器中 out.add(buff); } } }消息協議解碼
登錄協議相關
package protocol.system; import lombok.Getter; /** * @description: 登錄協議: |magic|version|code|data| * @author: quliang * @create: 2022-12-09 18:10 **/ public class LoginProtocol { @Getter private byte magic=0; @Getter private byte version=1; @Getter public byte code; }登錄協議基類
package protocol.system.request; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import protocol.system.LoginProtocol; /** * @description: * @author: quliang * @create: 2022-12-06 18:17 **/ @Data @NoArgsConstructor @AllArgsConstructor public class LoginRequest extends LoginProtocol { private String userId; private String userName; }登錄請求子類
package protocol.system.response; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import protocol.system.LoginProtocol; /** * @description: * @author: quliang * @create: 2022-12-06 18:22 **/ @Data @NoArgsConstructor @AllArgsConstructor public class LoginResponse extends LoginProtocol { private String msg; private String data; }登錄響應子類
package encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import protocol.system.LoginProtocol; import serializer.KryoSerializer; /** * @description: * @author: quliang * @create: 2022-12-06 22:11 **/ public class LoginEncoder extends MessageToByteEncoder<LoginProtocol> { @Override protected void encode(ChannelHandlerContext ctx, LoginProtocol baseProtocol, ByteBuf in) throws Exception { in.writeByte(baseProtocol.getMagic()); in.writeByte(baseProtocol.getCode()); in.writeByte(baseProtocol.getVersion()); byte[] data = new KryoSerializer().serialize(baseProtocol); in.writeShort(data.length); in.writeBytes(data); } }登錄協議編碼
package decoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import protocol.system.LoginProtocol; import serializer.KryoSerializer; import java.util.List; /** * @description: * @author: quliang * @create: 2022-12-06 17:59 **/ @Slf4j public class LoginDecoder extends ByteToMessageDecoder { private Class<LoginProtocol> clazz; public LoginDecoder(Class clazz) { this.clazz = clazz; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { byte magic = in.readByte(); byte code = in.readByte(); byte version = in.readByte(); short dataSize = in.readShort(); byte[] data = new byte[dataSize]; in.readBytes(data); LoginProtocol baseProtocol = new KryoSerializer().deserialize(data, clazz); out.add(baseProtocol); } catch (Exception e) { //如果解碼錯誤,將數據傳遞到下一個解碼器中 log.error("login decoder {}", e.getMessage()); // 重置讀取位元組索引,因為上邊已經讀了(readBytes),不加這個會導致數據為空 in.resetReaderIndex(); // 這裡是複製流,複製一份,防止skipBytes跳過,導致傳遞的消息變成空; ByteBuf buff = in.retainedDuplicate(); //原因是netty不允許有位元組內容不讀的情況發生,所以採用下邊的方法解決。 in.skipBytes(in.readableBytes()); //繼續傳遞到下一個解碼器中 out.add(buff); } } }登錄協議解碼
這樣公共模塊就創建完成了
創建服務端
package com.ql; import com.ql.handler.MsgHandler; import decoder.LoginDecoder; import decoder.MsgDecoder; import com.ql.handler.LoginHandler; import encoder.LoginEncoder; import encoder.MsgEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.msg.request.MsgRequest; /** * @author quliang * @description 服務端 * @date 2022-12-06 17:39:14 */ @Slf4j public class IotServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap().group( bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); /** * 心跳機制 */ //pipeline.addLast(new IdleStateHandler(5, 10, 5, TimeUnit.SECONDS)); /** * 消息、登錄解碼器 */ pipeline.addLast(new LoginDecoder(LoginRequest.class)); pipeline.addLast(new MsgDecoder(MsgRequest.class)); /** * 消息、登錄處理器 */ pipeline.addLast(new MsgHandler()); pipeline.addLast(new LoginHandler()); /** * 消息、登錄編碼器 */ pipeline.addLast(new MsgEncoder()); pipeline.addLast(new LoginEncoder()); } }) .option(ChannelOption.SO_BACKLOG, 1024); ChannelFuture cf = bootstrap.bind(8849).sync(); log.info("socket服務端啟動成功 {}", cf.channel().localAddress().toString()); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }服務端代碼
package com.ql.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.request.MsgRequest; import protocol.msg.response.MsgResponse; /** * @description: 消息處理器 * @author: quliang * @create: 2022-12-10 20:57 **/ @Slf4j @ChannelHandler.Sharable public class MsgHandler extends SimpleChannelInboundHandler<MsgRequest> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("上線{}", ctx.channel().remoteAddress().toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, MsgRequest request) throws Exception { log.info("服務端讀取消息體數據為{}", request.toString()); MsgResponse response = new MsgResponse(); response.setStatCode(200); ctx.channel().writeAndFlush(response); } }服務端消息處理器
package com.ql.handler; import io.netty.channel.*; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import protocol.system.request.LoginRequest; import protocol.system.response.LoginResponse; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 登錄處理器 * @author: quliang * @create: 2022-12-06 18:14 **/ @Slf4j @ChannelHandler.Sharable public class LoginHandler extends SimpleChannelInboundHandler<LoginRequest>{ private static AtomicInteger READER_COUNT = new AtomicInteger(0); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("服務端:{} 通道開啟!", ctx.channel().localAddress().toString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("服務端: {} 通道關閉!", ctx.channel().localAddress().toString()); } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequest loginRequest) throws Exception { log.info("讀取數據 {} ", loginRequest.toString()); LoginResponse response= new LoginResponse("success", null); ctx.channel().writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("...............數據接收-完畢..............."); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.error("...............業務處理異常...............{}", cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; Channel channel = ctx.channel(); switch (event.state()) { case READER_IDLE: log.info("讀空閑"); READER_COUNT.addAndGet(1); break; case WRITER_IDLE: log.info("寫空閑"); break; default: break; } ctx.disconnect(); if (READER_COUNT.get() > 3) { log.info("close this channel {}", channel.remoteAddress().toString()); } } } }服務端登錄處理器
服務端其實很多都是直接引用公共模塊的,代碼也並不複雜
創建消息客戶端
package com.ql; import com.ql.handler.ClientMsgHandler; import decoder.MsgDecoder; import encoder.MsgEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import protocol.msg.response.MsgResponse; import java.net.InetSocketAddress; /** * @author quliang * @description 客戶端 * @date 2022-12-06 17:37:56 */ @Slf4j public class IotCli