Netty 系列一(核心組件和實例).

来源:https://www.cnblogs.com/jmcui/archive/2018/06/08/9154842.html
-Advertisement-
Play Games

一、概念 早期的 Java API 只支持由本地系統套接字型檔提供所謂的阻塞函數來支持網路編程。由於是阻塞 I/O ,要管理多個併發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的併發數) ...


一、概念

    早期的 Java API 只支持由本地系統套接字型檔提供所謂的阻塞函數來支持網路編程由於是阻塞 I/O ,要管理多個併發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的併發數);第二,需要為每個線程的調用棧都分配記憶體;第三,JVM 線上程的上下文切換所帶來的開銷會帶來麻煩。

    Java 在 2002 年引入了非阻塞 I/O,位於 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 實現的關鍵。它使用了事件通知以確定在一組非阻塞套接字中有哪些已經就緒能夠進行 I/O 相關的操作。因為可以在任何的時間檢查任意的讀操作或者寫操作的完成狀態,所以如圖 1-2 所示,一個單一的線程便可以處理多個併發的連接。

    

    儘管可以直接使用 Java NIO API,但是在高負載下可靠和高效地處理和調度 I/O 操作是一項繁瑣而且容易出錯的任務,最好還是留給高性能的網路編程專家——Netty。

    Netty 是一款非同步的事件驅動的網路應用程式框架,支持快速的開發可維護的高性能的瞄向協議的服務端和客戶端。它駕馭了Java高級API的能力,並將其隱藏在一個易於使用的API之後。首先,它的基於 Java NIO 的非同步的和事件驅動的實現,保證了高負載下應用程式性能的最大化和可伸縮性。其次, Netty 也包含了一組設計模式,將應用程式邏輯從網路層解耦,簡化了開發過程, 同時也最大限度地提高了可測試性、模塊化以及代碼的可重用性。

 

     tips:面向對象的基本概念—> 用較簡單的抽象隱藏底層實現的複雜性。

二、核心組件

  • Channel

    Channel是Java NIO的一個基本構造。可以看作是傳入或傳出數據的載體。因此,它可以被打開或關閉,連接或者斷開連接。以下是常用的Channel:

-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel

  • 回調

    當一個回調被觸發時,相應的事件可以被一個interface-ChannelHandler的實現處理。

  • Future

    Netty中所有的I/O操作都是非同步的。因為一個操作可能不會立即返回,所以我們需要一種在之後的某個時間點確定其結果的方法。

    Future 和 回調 是相互補充的機制,提供了另一種在操作完成時通知應用程式的方式。這個對象可以看作是一個非同步操作結果的占位符;它將在未來的某個時刻完成,並提供對其結果的訪問。

    Netty 提供了ChannelFuture,用於在執行非同步操作的時候使用。每個Netty的出站I/O操作都會返回一個ChannelFuture。ChannelFuture能夠註冊一個或者多個ChannelFutureListener 實例。監聽器的回調方法operationComplete(),將會在對應的操作完成時被調用。

  • ChannelHandler

    Netty 的主要組件是ChannelHandler,它充當了所有處理入站和出站數據的應用程式邏輯的容器。

    Netty 使用不同的事件來通知我們狀態的改變或者是操作的狀態,每個事件都可以被分發給ChannelHandler類中某個用戶實現的方法。Netty提供了大量預定義的可以開箱即用的ChannelHandler實現,包括用於各種協議的ChannelHandler。

    現在,事件可以被分發給ChannelHandler類中某個用戶實現的方法。那麼,如果 ChannelHandler 處理完成後不直接返回給客戶端,而是傳遞給下一個ChannelHandler 繼續處理呢?那麼就要說到 ChannelPipeline !

    ChannelPipeline 提供了 ChannelHandler鏈 的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。使得事件流經 ChannelPipeline 是 ChannelHandler 的工作,它們是在應用程式的初始化或者引導階段被安裝的。這些對象接收事件、執行他們所實現的處理邏輯,並將數據傳遞給鏈中的下一個ChannelHandler:

1、一個ChannelInitializer的實現被註冊到了ServerBootstrap中。
2、當 ChannelInitializer.initChannel()方法被調用時, ChannelInitializer將在 ChannelPipeline 中安裝一組自定義的 ChannelHandler。
3、ChannelInitializer 將它自己從 ChannelPipeline 中移除。

  • EventLoop

    EventLoop 定義了Netty的核心抽象,用來處理連接的生命周期中所發生的事件,在內部,將會為每個Channel分配一個EventLoop。

    EventLoop本身只由一個線程驅動,其處理了一個Channel的所有I/O事件,並且在該EventLoop的整個生命周期內都不會改變。這個簡單而強大的設計消除了你可能有的在ChannelHandler實現中需要進行同步的任何顧慮。

    這裡需要說到,EventLoop的管理是通過EventLoopGroup來實現的。還要一點要註意的是,客戶端引導類是 Bootstrap,只需要一個EventLoopGroup。服務端引導類是 ServerBootstrap,通常需要兩個 EventLoopGroup,一個用來接收客戶端連接,一個用來處理 I/O 事件(也可以只使用一個 EventLoopGroup,此時其將在兩個場景下共用同一個 EventLoopGroup)。

1、一個 EventLoopGroup 包含一個或者多個 EventLoop;
2、一個 EventLoop 在它的生命周期內只和一個 Thread 綁定;
3、所有由 EventLoop 處理的 I/O 事件都將在它專有的Thread 上被處理;
4、一個 Channel 在它的生命周期內只註冊於一個EventLoop;
5、一個 EventLoop 可能會被分配給一個或者多個 Channel(面對多個Channel,一個 EventLoop 按照事件觸發,順序執行)。

三、實例

    所有的Netty服務端/客戶端都至少需要兩個部分:

1、至少一個ChannelHandler —— 該組件實現了對數據的處理。

2、引導 —— 這是配置伺服器的啟動代碼。

    服務端:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //1、創建EventLoopGroup以進行事件的處理,如接受新連接以及讀/寫數據
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //2、創建ServerBootstrap,引導和綁定伺服器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    //3、指定所使用的NIO傳輸Channel
                    .channel(NioServerSocketChannel.class)
                    //4、使用指定的埠設置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    //5、添加一個 EchoServerHandler 到子 Channel的 ChannelPipeline
                    //當一個新的連接被接受時,一個新的子Channel將會被創建,而 ChannelInitializer 將會把一個你的EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            //6、非同步地綁定伺服器,調用sync()方法阻塞等待直到綁定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress());
            //7、獲取 Channel 的 CloseFuture,並且阻塞當前線程直到它完成
            channelFuture.channel().closeFuture().sync();

        } finally {
            //8、關閉 EventLoopGroup 釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoServer(9999).start();
    }
}
@ChannelHandler.Sharable //標識一個Channel-Handler 可以被多個Channel安全的共用
public class EchoServerHandler extends ChannelHandlerAdapter {


    /**
     * 對於每個傳入的消息都要調用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //將接收到的消息寫給發送者,而不沖刷出站消息
        //ChannelHandlerContext 發送消息。導致消息向下一個ChannelHandler流動
        //Channel 發送消息將會導致消息從 ChannelPipeline的尾端開始流動
        ctx.write(in);
    }

    /**
     * 通知 ChannelHandlerAdapter 最後一次對channel-Read()的調用是當前批量讀取中的最後一條消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //暫存於ChannelOutboundBuffer中的消息,在下一次調用flush()或者writeAndFlush()方法時將會嘗試寫出到套接字
        //將這份暫存消息沖刷到遠程節點,並且關閉該Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在讀取操作期間,有異常拋出時會調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
EchoServerHandler.java

 

    客戶端:

public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //指定 EventLoopGroup 以處理客戶端事件;適應於NIO的實現
            bootstrap.group(group)
                    //適用於NIO傳輸的Channel類型
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    //在創建Channel時,向ChannelPipeline中添加一個EchoClientHandler實例
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //連接到遠程節點,阻塞等待直到連接完成
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞,直到Channel 關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //關閉線程池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();
    }


}
@ChannelHandler.Sharable //標記該類的實例可以被多個Channel共用
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 當從伺服器接收到一條消息時被調用
     *
     * @param ctx
     * @param msg ByteBuf (Netty 的位元組容器) 作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序接收
     * @throws Exception
     */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client" + ctx.channel().remoteAddress() + "connected");
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在到伺服器的連接已經建立之後將被調用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)  {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    }


    /**
     * 在處理過程中引發異常時被調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
EchoClientHandler.java

四、結語

    帶著一陣迷糊就開始了Netty學習之旅,學到現在還是對Netty一堆專有名詞頭大!沒辦法,只好硬著頭皮學下去了,畢竟,熟讀唐詩三百首,不會作詩也會吟嘛!

    來總結下,一個Netty服務端處理客戶端連接的過程:

1、創建一個channel同該用戶端進行綁定;
2、channel從EventLoopGroup獲得一個EventLoop,並註冊到該EventLoop,channel生命周期內都和該EventLoop在一起(註冊時獲得selectionKey);
3、channel同用戶端進行網路連接、關閉和讀寫,生成相對應的event(改變selectinKey信息),觸發eventloop調度線程進行執行;
4、ChannelPipeline 找到對應 ChannelHandler 方法處理用戶邏輯。

    我們項目中使用的 Netty 服務端啟動類:

public class NettyServer {

    public static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private static Integer LISTENER_PORT = PropertiesLoader.getResourcesLoader().getInteger("nettyPort");



    private int port;
    EventLoopGroup boss = null;
    EventLoopGroup worker = null;
    ServerBootstrap serverBootstrap = null;

    public static NettyServer nettyServer = null;

    public static NettyServer getInstance() {
        if (nettyServer == null) {
            synchronized (NettyServer.class) {
                if (nettyServer == null) {
                    nettyServer = new NettyServer(LISTENER_PORT==null?9999:LISTENER_PORT);
                }
            }
        }
        return nettyServer;
    }

    /**
     * 構造函數
     *
     * @param port 埠
     */
    private NettyServer(int port) {
        this.port = port;

    }

    /**
     * 綁定
     *
     * @throws InterruptedException
     */
    public void init() throws InterruptedException {
        try {

            //創建兩個線程池
            //目前伺服器CPU為單核8線程,調整線程為8
            boss = new NioEventLoopGroup(8);
            worker = new NioEventLoopGroup(8);

            serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);//兩個工作線程
            serverBootstrap.channel(NioServerSocketChannel.class);
            //重用緩衝區
            serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            //自動調整下一次緩衝區建立時分配的空間大小,避免記憶體的浪費
            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
            //當伺服器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度,預設值50。
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //用於啟用或關於Nagle演算法。如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為true關閉Nagle演算法;如果要減少發送次數減少網路交互,就設置為false等累積一定大小後再發送。預設為false。
            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
            //是否啟用心跳保活機制
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            //支持tcp協議
            //bootstrap.childHandler(new TcpChannelInitializer());

            //支持webSocket協議
            serverBootstrap.childHandler(new WebSocketChannelInitializer());
            ChannelFuture f = serverBootstrap.bind(port).sync();
            if (f.isSuccess()) {
                logger.info("netty server start...");
            }
            //等到服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            //優雅釋放線程資源
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        }
    }

    /**
     * 銷毀netty相關資源
     */
    public void destroy() {
        try {
            if (boss != null) {
                boss.shutdownGracefully();
            }
            if (worker != null) {
                worker.shutdownGracefully();
            }
            if (serverBootstrap != null) {
                serverBootstrap = null;
            }
        } catch (Exception e) {
            logger.error("netty close err:" + e.getMessage(), e);
        }
    }
}
NettyServer.java

 

參考資料:《Netty IN ACTION》

演示源代碼:https://github.com/JMCuixy/NettyDemo


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 就是想記錄一下 JSP中傳參的四種方法: 1、form表單 2、request.setAttribute();和request.getAttribute(); 3、超鏈接:<a herf="index.jsp"?a=a&b=b&c=c>name</a> 4、<jsp:param> 發生Referen ...
  • python從2.6開始支持format,新的更加容易讀懂的字元串格式化方法, 從原來的% 模式變成新的可讀性更強的 綜合舉例說明: 輸入: '{:>18,.2f}'.format(70305084.0) # :冒號+空白填充+右對齊+固定寬度18+浮點精度.2+浮點數聲明f 輸出:' 70,305 ...
  • 原創 三羊獻瑞 觀察下麵的加法算式: 祥 瑞 生 輝 + 三 羊 獻 瑞 三 羊 生 瑞 氣 (如果有對齊問題,可以參看【圖1.jpg】) 其中,相同的漢字代表相同的數字,不同的漢字代表不同的數字。 請你填寫“三羊獻瑞”所代表的4位數字(答案唯一),不要填寫任何多餘內容。 分析:三羊生瑞氣這個數中三 ...
  • 13.1點擊更換圖形驗證碼 (1)front/signup.html (2)static/front/css/signup.css body { background: #f3f3f3; } .outer-box { width: 854px; background: #fff; margin: 0 ...
  • realloc 用方法 void realloc(void , n) 根據n的大小,如果n比較小,就沿用原來的記憶體地址(也就是返回的地址就是原來的地址),在原來地址的記憶體空間的最後面,加上n大小的記憶體空間;如果n比較大,系統就不會沿用原來的記憶體地址,系統有新開闢一個記憶體空間,並把原來記憶體空間里存放的 ...
  • Description 下麵的程式輸出結果是: A::Fun A::Do A::Fun C::Do 請填空: 程式代碼如下 ~~~~ include using namespace std; class A { private: int nVal; public: void Fun() { cout ...
  • 當我們瀏覽網頁的時候,經常會看到像下麵這些好看的圖片,你是否想把這些圖片保存下載下來。 我們最常規的做法就是通過滑鼠右鍵,選擇另存為。但有些圖片點擊滑鼠右鍵的時候並沒有另存為選項,或者你可以通過截圖工具截取下來,但這樣就降低圖片的清晰度,並且這樣效率很低。 那腫麽辦呢? 我們可以通過python 來 ...
  • Description 下麵程式的輸出結果是: destructor B destructor A 請完整寫出 class A。 限制條件:不得為 class A 編寫構造函數。 ~~~~ include using namespace std; class A { // Your Code Her ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...