Netty是建立在NIO基礎之上,Netty在NIO之上又提供了更高層次的抽象。 在Netty裡面,Accept連接可以使用單獨的線程池去處理,讀寫操作又是另外的線程池來處理。 Accept連接和讀寫操作也可以使用同一個線程池來進行處理。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫 ...
Netty是建立在NIO基礎之上,Netty在NIO之上又提供了更高層次的抽象。
在Netty裡面,Accept連接可以使用單獨的線程池去處理,讀寫操作又是另外的線程池來處理。
Accept連接和讀寫操作也可以使用同一個線程池來進行處理。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫線程一塊處理。線程池中的每一個線程都是NIO線程。用戶可以根據實際情況進行組裝,構造出滿足系統需求的併發模型。
Netty提供了內置的常用編解碼器,包括行編解碼器[一行一個請求],首碼長度編解碼器[前N個位元組定義請求的位元組長度],可重放解碼器[記錄半包消息的狀態],HTTP編解碼器,WebSocket消息編解碼器等等
Netty提供了一些列生命周期回調介面,當一個完整的請求到達時,當一個連接關閉時,當一個連接建立時,用戶都會收到回調事件,然後進行邏輯處理。
Netty可以同時管理多個埠,可以使用NIO客戶端模型,這些對於RPC服務是很有必要的。
Netty除了可以處理TCP Socket之外,還可以處理UDP Socket。
在消息讀寫過程中,需要大量使用ByteBuffer,Netty對ByteBuffer在性能和使用的便捷性上都進行了優化和抽象。
代碼:
服務端:
package com.kinson.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * descripiton:服務端 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 15:37 * @modifier: * @since: */ public class NettyServer { /** * 埠 */ private int port; public NettyServer(int port) { this.port = port; } public void run() { //EventLoopGroup是用來處理IO操作的多線程事件迴圈器 //負責接收客戶端連接線程 EventLoopGroup bossGroup = new NioEventLoopGroup(); //負責處理客戶端i/o事件、task任務、監聽任務組 EventLoopGroup workerGroup = new NioEventLoopGroup(); //啟動 NIO 服務的輔助啟動類 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); //配置 Channel bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ServerIniterHandler()); //BACKLOG用於構造服務端套接字ServerSocket對象, // 標識當伺服器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); //是否啟用心跳保活機制 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); try { //綁定服務埠監聽 Channel channel = bootstrap.bind(port).sync().channel(); System.out.println("server run in port " + port); //伺服器關閉監聽 /*channel.closeFuture().sync()實際是如何工作: channel.closeFuture()不做任何操作,只是簡單的返回channel對象中的closeFuture對象,對於每個Channel對象,都會有唯一的一個CloseFuture,用來表示關閉的Future, 所有執行channel.closeFuture().sync()就是執行的CloseFuturn的sync方法,從上面的解釋可以知道,這步是會將當前線程阻塞在CloseFuture上*/ channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉事件流組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { new NettyServer(8899).run(); } }
服務端業務邏輯處理:
package com.kinson.netty.server; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * descripiton: 伺服器的處理邏輯 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 15:50 * @modifier: * @since: */ public class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 所有的活動用戶 */ public static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 讀取消息通道 * * @param context * @param s * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext context, String s) throws Exception { Channel channel = context.channel(); //當有用戶發送消息的時候,對其他的用戶發送消息 for (Channel ch : group) { if (ch == channel) { ch.writeAndFlush("[you]: " + s + "\n"); } else { ch.writeAndFlush("[" + channel.remoteAddress() + "]: " + s + "\n"); } } System.out.println("[" + channel.remoteAddress() + "]: " + s + "\n"); } /** * 處理新加的消息通道 * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); for (Channel ch : group) { if (ch == channel) { ch.writeAndFlush("[" + channel.remoteAddress() + "] coming"); } } group.add(channel); } /** * 處理退出消息通道 * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); for (Channel ch : group) { if (ch == channel) { ch.writeAndFlush("[" + channel.remoteAddress() + "] leaving"); } } group.remove(channel); } /** * 在建立連接時發送消息 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); boolean active = channel.isActive(); if (active) { System.out.println("[" + channel.remoteAddress() + "] is online"); } else { System.out.println("[" + channel.remoteAddress() + "] is offline"); } ctx.writeAndFlush("[server]: welcome"); } /** * 退出時發送消息 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); if (!channel.isActive()) { System.out.println("[" + channel.remoteAddress() + "] is offline"); } else { System.out.println("[" + channel.remoteAddress() + "] is online"); } } /** * 異常捕獲 * * @param ctx * @param e * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { Channel channel = ctx.channel(); System.out.println("[" + channel.remoteAddress() + "] leave the room"); ctx.close().sync(); } }
服務端處理器註冊:
package com.kinson.netty.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * descripiton: 伺服器初始化 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 15:46 * @modifier: * @since: */ public class ServerIniterHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //管道註冊handler ChannelPipeline pipeline = socketChannel.pipeline(); //編碼通道處理 pipeline.addLast("decode", new StringDecoder()); //轉碼通道處理 pipeline.addLast("encode", new StringEncoder()); //聊天服務通道處理 pipeline.addLast("chat", new ServerHandler()); } }
客戶端:
package com.kinson.netty.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.commons.lang3.StringUtils; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * descripiton: 客戶端 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 16:40 * @modifier: * @since: */ public class NettyClient { private String ip; private int port; private boolean stop = false; public NettyClient(String ip, int port) { this.ip = ip; this.port = port; } public void run() throws IOException { //設置一個多線程迴圈器 EventLoopGroup workerGroup = new NioEventLoopGroup(); //啟動附註類 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); //指定所使用的NIO傳輸channel bootstrap.channel(NioSocketChannel.class); //指定客戶端初始化處理 bootstrap.handler(new ClientIniterHandler()); try { //連接服務 Channel channel = bootstrap.connect(ip, port).sync().channel(); while (true) { //向服務端發送內容 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String content = reader.readLine(); if (StringUtils.isNotEmpty(content)) { if (StringUtils.equalsIgnoreCase(content, "q")) { System.exit(1); } channel.writeAndFlush(content); } } } catch (InterruptedException e) { e.printStackTrace(); System.exit(1); } finally { workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyClient("127.0.0.1", 8899).run(); } }
客戶端邏輯處理:
package com.kinson.netty.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * descripiton: 客戶端邏輯處理 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 16:50 * @modifier: * @since: */ public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { //列印服務端的發送數據 System.out.println(s); } }
客戶端處理器註冊:
package com.kinson.netty.client; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * descripiton: 客戶端處理初始化 * * @author: www.iknowba.cn * @date: 2018/3/23 * @time: 16:55 * @modifier: * @since: */ public class ClientIniterHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //註冊管道 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("http", new HttpClientCodec()); pipeline.addLast("chat", new ClientHandler()); } }
測試時先啟動服務端,再啟動客戶端。。。