一、概念 早期的 Java API 只支持由本地系統套接字型檔提供所謂的阻塞函數來支持網路編程。由於是阻塞 I/O ,要管理多個併發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的併發數) ...
一、概念
早期的 Java API 只支持由本地系統套接字型檔提供所謂的阻塞函數來支持網路編程。由於是阻塞 I/O ,要管理多個併發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的併發數);第二,需要為每個線程的調用棧都分配記憶體;第三,JVM 線上程的上下文切換所帶來的開銷會帶來麻煩。
Java 在 2002 年引入了非阻塞 I/O,位於 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 實現的關鍵。它使用了事件通知以確定在一組非阻塞套接字中有哪些已經就緒能夠進行 I/O 相關的操作。因為可以在任何的時間檢查任意的讀操作或者寫操作的完成狀態,所以如圖 1-2 所示,一個單一的線程便可以處理多個併發的連接。
儘管可以直接使用 Java NIO API,但是在高負載下可靠和高效地處理和調度 I/O 操作是一項繁瑣而且容易出錯的任務,最好還是留給高性能的網路編程專家——Netty。
Netty 是一款非同步的事件驅動的網路應用程式框架,支持快速的開發可維護的高性能的瞄向協議的服務端和客戶端。它駕馭了Java高級API的能力,並將其隱藏在一個易於使用的API之後。首先,它的基於 Java NIO 的非同步的和事件驅動的實現,保證了高負載下應用程式性能的最大化和可伸縮性。其次, Netty 也包含了一組設計模式,將應用程式邏輯從網路層解耦,簡化了開發過程, 同時也最大限度地提高了可測試性、模塊化以及代碼的可重用性。
tips:面向對象的基本概念—> 用較簡單的抽象隱藏底層實現的複雜性。
二、核心組件
- Channel
Channel是Java NIO的一個基本構造。可以看作是傳入或傳出數據的載體。因此,它可以被打開或關閉,連接或者斷開連接。以下是常用的Channel:
-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel
- 回調
當一個回調被觸發時,相應的事件可以被一個interface-ChannelHandler的實現處理。
- Future
Netty中所有的I/O操作都是非同步的。因為一個操作可能不會立即返回,所以我們需要一種在之後的某個時間點確定其結果的方法。
Future 和 回調 是相互補充的機制,提供了另一種在操作完成時通知應用程式的方式。這個對象可以看作是一個非同步操作結果的占位符;它將在未來的某個時刻完成,並提供對其結果的訪問。
Netty 提供了ChannelFuture,用於在執行非同步操作的時候使用。每個Netty的出站I/O操作都會返回一個ChannelFuture。ChannelFuture能夠註冊一個或者多個ChannelFutureListener 實例。監聽器的回調方法operationComplete(),將會在對應的操作完成時被調用。
- ChannelHandler
Netty 的主要組件是ChannelHandler,它充當了所有處理入站和出站數據的應用程式邏輯的容器。
Netty 使用不同的事件來通知我們狀態的改變或者是操作的狀態,每個事件都可以被分發給ChannelHandler類中某個用戶實現的方法。Netty提供了大量預定義的可以開箱即用的ChannelHandler實現,包括用於各種協議的ChannelHandler。
現在,事件可以被分發給ChannelHandler類中某個用戶實現的方法。那麼,如果 ChannelHandler 處理完成後不直接返回給客戶端,而是傳遞給下一個ChannelHandler 繼續處理呢?那麼就要說到 ChannelPipeline !
ChannelPipeline 提供了 ChannelHandler鏈 的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。使得事件流經 ChannelPipeline 是 ChannelHandler 的工作,它們是在應用程式的初始化或者引導階段被安裝的。這些對象接收事件、執行他們所實現的處理邏輯,並將數據傳遞給鏈中的下一個ChannelHandler:
1、一個ChannelInitializer的實現被註冊到了ServerBootstrap中。
2、當 ChannelInitializer.initChannel()方法被調用時, ChannelInitializer將在 ChannelPipeline 中安裝一組自定義的 ChannelHandler。
3、ChannelInitializer 將它自己從 ChannelPipeline 中移除。
- EventLoop
EventLoop 定義了Netty的核心抽象,用來處理連接的生命周期中所發生的事件,在內部,將會為每個Channel分配一個EventLoop。
EventLoop本身只由一個線程驅動,其處理了一個Channel的所有I/O事件,並且在該EventLoop的整個生命周期內都不會改變。這個簡單而強大的設計消除了你可能有的在ChannelHandler實現中需要進行同步的任何顧慮。
這裡需要說到,EventLoop的管理是通過EventLoopGroup來實現的。還要一點要註意的是,客戶端引導類是 Bootstrap,只需要一個EventLoopGroup。服務端引導類是 ServerBootstrap,通常需要兩個 EventLoopGroup,一個用來接收客戶端連接,一個用來處理 I/O 事件(也可以只使用一個 EventLoopGroup,此時其將在兩個場景下共用同一個 EventLoopGroup)。
1、一個 EventLoopGroup 包含一個或者多個 EventLoop;
2、一個 EventLoop 在它的生命周期內只和一個 Thread 綁定;
3、所有由 EventLoop 處理的 I/O 事件都將在它專有的Thread 上被處理;
4、一個 Channel 在它的生命周期內只註冊於一個EventLoop;
5、一個 EventLoop 可能會被分配給一個或者多個 Channel(面對多個Channel,一個 EventLoop 按照事件觸發,順序執行)。
三、實例
所有的Netty服務端/客戶端都至少需要兩個部分:
1、至少一個ChannelHandler —— 該組件實現了對數據的處理。
2、引導 —— 這是配置伺服器的啟動代碼。
服務端:
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws InterruptedException { final EchoServerHandler serverHandler = new EchoServerHandler(); //1、創建EventLoopGroup以進行事件的處理,如接受新連接以及讀/寫數據 EventLoopGroup group = new NioEventLoopGroup(); try { //2、創建ServerBootstrap,引導和綁定伺服器 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group, group) //3、指定所使用的NIO傳輸Channel .channel(NioServerSocketChannel.class) //4、使用指定的埠設置套接字地址 .localAddress(new InetSocketAddress(port)) //5、添加一個 EchoServerHandler 到子 Channel的 ChannelPipeline //當一個新的連接被接受時,一個新的子Channel將會被創建,而 ChannelInitializer 將會把一個你的EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(serverHandler); } }); //6、非同步地綁定伺服器,調用sync()方法阻塞等待直到綁定完成 ChannelFuture channelFuture = bootstrap.bind().sync(); System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress()); //7、獲取 Channel 的 CloseFuture,並且阻塞當前線程直到它完成 channelFuture.channel().closeFuture().sync(); } finally { //8、關閉 EventLoopGroup 釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { new EchoServer(9999).start(); } }
@ChannelHandler.Sharable //標識一個Channel-Handler 可以被多個Channel安全的共用 public class EchoServerHandler extends ChannelHandlerAdapter { /** * 對於每個傳入的消息都要調用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); //將接收到的消息寫給發送者,而不沖刷出站消息 //ChannelHandlerContext 發送消息。導致消息向下一個ChannelHandler流動 //Channel 發送消息將會導致消息從 ChannelPipeline的尾端開始流動 ctx.write(in); } /** * 通知 ChannelHandlerAdapter 最後一次對channel-Read()的調用是當前批量讀取中的最後一條消息 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //暫存於ChannelOutboundBuffer中的消息,在下一次調用flush()或者writeAndFlush()方法時將會嘗試寫出到套接字 //將這份暫存消息沖刷到遠程節點,並且關閉該Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } /** * 在讀取操作期間,有異常拋出時會調用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }EchoServerHandler.java
客戶端:
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { //創建Bootstrap Bootstrap bootstrap = new Bootstrap(); //指定 EventLoopGroup 以處理客戶端事件;適應於NIO的實現 bootstrap.group(group) //適用於NIO傳輸的Channel類型 .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) //在創建Channel時,向ChannelPipeline中添加一個EchoClientHandler實例 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); //連接到遠程節點,阻塞等待直到連接完成 ChannelFuture channelFuture = bootstrap.connect().sync(); //阻塞,直到Channel 關閉 channelFuture.channel().closeFuture().sync(); } finally { //關閉線程池並且釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { new EchoClient("127.0.0.1", 9999).start(); System.out.println("------------------------------------"); new EchoClient("127.0.0.1", 9999).start(); System.out.println("------------------------------------"); new EchoClient("127.0.0.1", 9999).start(); } }
@ChannelHandler.Sharable //標記該類的實例可以被多個Channel共用 public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 當從伺服器接收到一條消息時被調用 * * @param ctx * @param msg ByteBuf (Netty 的位元組容器) 作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序接收 * @throws Exception */ @Override protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client" + ctx.channel().remoteAddress() + "connected"); System.out.println(msg.toString(CharsetUtil.UTF_8)); } /** * 在到伺服器的連接已經建立之後將被調用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8)); } /** * 在處理過程中引發異常時被調用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }EchoClientHandler.java
四、結語
帶著一陣迷糊就開始了Netty學習之旅,學到現在還是對Netty一堆專有名詞頭大!沒辦法,只好硬著頭皮學下去了,畢竟,熟讀唐詩三百首,不會作詩也會吟嘛!
來總結下,一個Netty服務端處理客戶端連接的過程:
1、創建一個channel同該用戶端進行綁定;
2、channel從EventLoopGroup獲得一個EventLoop,並註冊到該EventLoop,channel生命周期內都和該EventLoop在一起(註冊時獲得selectionKey);
3、channel同用戶端進行網路連接、關閉和讀寫,生成相對應的event(改變selectinKey信息),觸發eventloop調度線程進行執行;
4、ChannelPipeline 找到對應 ChannelHandler 方法處理用戶邏輯。
我們項目中使用的 Netty 服務端啟動類:
public class NettyServer { public static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private static Integer LISTENER_PORT = PropertiesLoader.getResourcesLoader().getInteger("nettyPort"); private int port; EventLoopGroup boss = null; EventLoopGroup worker = null; ServerBootstrap serverBootstrap = null; public static NettyServer nettyServer = null; public static NettyServer getInstance() { if (nettyServer == null) { synchronized (NettyServer.class) { if (nettyServer == null) { nettyServer = new NettyServer(LISTENER_PORT==null?9999:LISTENER_PORT); } } } return nettyServer; } /** * 構造函數 * * @param port 埠 */ private NettyServer(int port) { this.port = port; } /** * 綁定 * * @throws InterruptedException */ public void init() throws InterruptedException { try { //創建兩個線程池 //目前伺服器CPU為單核8線程,調整線程為8 boss = new NioEventLoopGroup(8); worker = new NioEventLoopGroup(8); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker);//兩個工作線程 serverBootstrap.channel(NioServerSocketChannel.class); //重用緩衝區 serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); //自動調整下一次緩衝區建立時分配的空間大小,避免記憶體的浪費 serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT); //當伺服器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度,預設值50。 serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); //用於啟用或關於Nagle演算法。如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為true關閉Nagle演算法;如果要減少發送次數減少網路交互,就設置為false等累積一定大小後再發送。預設為false。 serverBootstrap.option(ChannelOption.TCP_NODELAY, true); //是否啟用心跳保活機制 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); //支持tcp協議 //bootstrap.childHandler(new TcpChannelInitializer()); //支持webSocket協議 serverBootstrap.childHandler(new WebSocketChannelInitializer()); ChannelFuture f = serverBootstrap.bind(port).sync(); if (f.isSuccess()) { logger.info("netty server start..."); } //等到服務端監聽埠關閉 f.channel().closeFuture().sync(); } finally { //優雅釋放線程資源 boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); } } /** * 銷毀netty相關資源 */ public void destroy() { try { if (boss != null) { boss.shutdownGracefully(); } if (worker != null) { worker.shutdownGracefully(); } if (serverBootstrap != null) { serverBootstrap = null; } } catch (Exception e) { logger.error("netty close err:" + e.getMessage(), e); } } }NettyServer.java
參考資料:《Netty IN ACTION》
演示源代碼:https://github.com/JMCuixy/NettyDemo