Netty 系列一(核心組件和實例).

来源:https://www.cnblogs.com/jmcui/archive/2018/06/08/9154842.html
-Advertisement-
Play Games

一、概念 早期的 Java API 只支持由本地系統套接字型檔提供所謂的阻塞函數來支持網路編程。由於是阻塞 I/O ,要管理多個併發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的併發數) ...


一、概念

    早期的 Java API 只支持由本地系統套接字型檔提供所謂的阻塞函數來支持網路編程由於是阻塞 I/O ,要管理多個併發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的併發數);第二,需要為每個線程的調用棧都分配記憶體;第三,JVM 線上程的上下文切換所帶來的開銷會帶來麻煩。

    Java 在 2002 年引入了非阻塞 I/O,位於 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 實現的關鍵。它使用了事件通知以確定在一組非阻塞套接字中有哪些已經就緒能夠進行 I/O 相關的操作。因為可以在任何的時間檢查任意的讀操作或者寫操作的完成狀態,所以如圖 1-2 所示,一個單一的線程便可以處理多個併發的連接。

    

    儘管可以直接使用 Java NIO API,但是在高負載下可靠和高效地處理和調度 I/O 操作是一項繁瑣而且容易出錯的任務,最好還是留給高性能的網路編程專家——Netty。

    Netty 是一款非同步的事件驅動的網路應用程式框架,支持快速的開發可維護的高性能的瞄向協議的服務端和客戶端。它駕馭了Java高級API的能力,並將其隱藏在一個易於使用的API之後。首先,它的基於 Java NIO 的非同步的和事件驅動的實現,保證了高負載下應用程式性能的最大化和可伸縮性。其次, Netty 也包含了一組設計模式,將應用程式邏輯從網路層解耦,簡化了開發過程, 同時也最大限度地提高了可測試性、模塊化以及代碼的可重用性。

 

     tips:面向對象的基本概念—> 用較簡單的抽象隱藏底層實現的複雜性。

二、核心組件

  • Channel

    Channel是Java NIO的一個基本構造。可以看作是傳入或傳出數據的載體。因此,它可以被打開或關閉,連接或者斷開連接。以下是常用的Channel:

-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel

  • 回調

    當一個回調被觸發時,相應的事件可以被一個interface-ChannelHandler的實現處理。

  • Future

    Netty中所有的I/O操作都是非同步的。因為一個操作可能不會立即返回,所以我們需要一種在之後的某個時間點確定其結果的方法。

    Future 和 回調 是相互補充的機制,提供了另一種在操作完成時通知應用程式的方式。這個對象可以看作是一個非同步操作結果的占位符;它將在未來的某個時刻完成,並提供對其結果的訪問。

    Netty 提供了ChannelFuture,用於在執行非同步操作的時候使用。每個Netty的出站I/O操作都會返回一個ChannelFuture。ChannelFuture能夠註冊一個或者多個ChannelFutureListener 實例。監聽器的回調方法operationComplete(),將會在對應的操作完成時被調用。

  • ChannelHandler

    Netty 的主要組件是ChannelHandler,它充當了所有處理入站和出站數據的應用程式邏輯的容器。

    Netty 使用不同的事件來通知我們狀態的改變或者是操作的狀態,每個事件都可以被分發給ChannelHandler類中某個用戶實現的方法。Netty提供了大量預定義的可以開箱即用的ChannelHandler實現,包括用於各種協議的ChannelHandler。

    現在,事件可以被分發給ChannelHandler類中某個用戶實現的方法。那麼,如果 ChannelHandler 處理完成後不直接返回給客戶端,而是傳遞給下一個ChannelHandler 繼續處理呢?那麼就要說到 ChannelPipeline !

    ChannelPipeline 提供了 ChannelHandler鏈 的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。使得事件流經 ChannelPipeline 是 ChannelHandler 的工作,它們是在應用程式的初始化或者引導階段被安裝的。這些對象接收事件、執行他們所實現的處理邏輯,並將數據傳遞給鏈中的下一個ChannelHandler:

1、一個ChannelInitializer的實現被註冊到了ServerBootstrap中。
2、當 ChannelInitializer.initChannel()方法被調用時, ChannelInitializer將在 ChannelPipeline 中安裝一組自定義的 ChannelHandler。
3、ChannelInitializer 將它自己從 ChannelPipeline 中移除。

  • EventLoop

    EventLoop 定義了Netty的核心抽象,用來處理連接的生命周期中所發生的事件,在內部,將會為每個Channel分配一個EventLoop。

    EventLoop本身只由一個線程驅動,其處理了一個Channel的所有I/O事件,並且在該EventLoop的整個生命周期內都不會改變。這個簡單而強大的設計消除了你可能有的在ChannelHandler實現中需要進行同步的任何顧慮。

    這裡需要說到,EventLoop的管理是通過EventLoopGroup來實現的。還要一點要註意的是,客戶端引導類是 Bootstrap,只需要一個EventLoopGroup。服務端引導類是 ServerBootstrap,通常需要兩個 EventLoopGroup,一個用來接收客戶端連接,一個用來處理 I/O 事件(也可以只使用一個 EventLoopGroup,此時其將在兩個場景下共用同一個 EventLoopGroup)。

1、一個 EventLoopGroup 包含一個或者多個 EventLoop;
2、一個 EventLoop 在它的生命周期內只和一個 Thread 綁定;
3、所有由 EventLoop 處理的 I/O 事件都將在它專有的Thread 上被處理;
4、一個 Channel 在它的生命周期內只註冊於一個EventLoop;
5、一個 EventLoop 可能會被分配給一個或者多個 Channel(面對多個Channel,一個 EventLoop 按照事件觸發,順序執行)。

三、實例

    所有的Netty服務端/客戶端都至少需要兩個部分:

1、至少一個ChannelHandler —— 該組件實現了對數據的處理。

2、引導 —— 這是配置伺服器的啟動代碼。

    服務端:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //1、創建EventLoopGroup以進行事件的處理,如接受新連接以及讀/寫數據
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //2、創建ServerBootstrap,引導和綁定伺服器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    //3、指定所使用的NIO傳輸Channel
                    .channel(NioServerSocketChannel.class)
                    //4、使用指定的埠設置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    //5、添加一個 EchoServerHandler 到子 Channel的 ChannelPipeline
                    //當一個新的連接被接受時,一個新的子Channel將會被創建,而 ChannelInitializer 將會把一個你的EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            //6、非同步地綁定伺服器,調用sync()方法阻塞等待直到綁定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress());
            //7、獲取 Channel 的 CloseFuture,並且阻塞當前線程直到它完成
            channelFuture.channel().closeFuture().sync();

        } finally {
            //8、關閉 EventLoopGroup 釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoServer(9999).start();
    }
}
@ChannelHandler.Sharable //標識一個Channel-Handler 可以被多個Channel安全的共用
public class EchoServerHandler extends ChannelHandlerAdapter {


    /**
     * 對於每個傳入的消息都要調用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //將接收到的消息寫給發送者,而不沖刷出站消息
        //ChannelHandlerContext 發送消息。導致消息向下一個ChannelHandler流動
        //Channel 發送消息將會導致消息從 ChannelPipeline的尾端開始流動
        ctx.write(in);
    }

    /**
     * 通知 ChannelHandlerAdapter 最後一次對channel-Read()的調用是當前批量讀取中的最後一條消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //暫存於ChannelOutboundBuffer中的消息,在下一次調用flush()或者writeAndFlush()方法時將會嘗試寫出到套接字
        //將這份暫存消息沖刷到遠程節點,並且關閉該Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在讀取操作期間,有異常拋出時會調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
EchoServerHandler.java

 

    客戶端:

public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //指定 EventLoopGroup 以處理客戶端事件;適應於NIO的實現
            bootstrap.group(group)
                    //適用於NIO傳輸的Channel類型
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    //在創建Channel時,向ChannelPipeline中添加一個EchoClientHandler實例
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //連接到遠程節點,阻塞等待直到連接完成
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞,直到Channel 關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //關閉線程池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();
    }


}
@ChannelHandler.Sharable //標記該類的實例可以被多個Channel共用
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 當從伺服器接收到一條消息時被調用
     *
     * @param ctx
     * @param msg ByteBuf (Netty 的位元組容器) 作為一個面向流的協議,TCP 保證了位元組數組將會按照伺服器發送它們的順序接收
     * @throws Exception
     */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client" + ctx.channel().remoteAddress() + "connected");
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在到伺服器的連接已經建立之後將被調用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)  {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    }


    /**
     * 在處理過程中引發異常時被調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
EchoClientHandler.java

四、結語

    帶著一陣迷糊就開始了Netty學習之旅,學到現在還是對Netty一堆專有名詞頭大!沒辦法,只好硬著頭皮學下去了,畢竟,熟讀唐詩三百首,不會作詩也會吟嘛!

    來總結下,一個Netty服務端處理客戶端連接的過程:

1、創建一個channel同該用戶端進行綁定;
2、channel從EventLoopGroup獲得一個EventLoop,並註冊到該EventLoop,channel生命周期內都和該EventLoop在一起(註冊時獲得selectionKey);
3、channel同用戶端進行網路連接、關閉和讀寫,生成相對應的event(改變selectinKey信息),觸發eventloop調度線程進行執行;
4、ChannelPipeline 找到對應 ChannelHandler 方法處理用戶邏輯。

    我們項目中使用的 Netty 服務端啟動類:

public class NettyServer {

    public static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private static Integer LISTENER_PORT = PropertiesLoader.getResourcesLoader().getInteger("nettyPort");



    private int port;
    EventLoopGroup boss = null;
    EventLoopGroup worker = null;
    ServerBootstrap serverBootstrap = null;

    public static NettyServer nettyServer = null;

    public static NettyServer getInstance() {
        if (nettyServer == null) {
            synchronized (NettyServer.class) {
                if (nettyServer == null) {
                    nettyServer = new NettyServer(LISTENER_PORT==null?9999:LISTENER_PORT);
                }
            }
        }
        return nettyServer;
    }

    /**
     * 構造函數
     *
     * @param port 埠
     */
    private NettyServer(int port) {
        this.port = port;

    }

    /**
     * 綁定
     *
     * @throws InterruptedException
     */
    public void init() throws InterruptedException {
        try {

            //創建兩個線程池
            //目前伺服器CPU為單核8線程,調整線程為8
            boss = new NioEventLoopGroup(8);
            worker = new NioEventLoopGroup(8);

            serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);//兩個工作線程
            serverBootstrap.channel(NioServerSocketChannel.class);
            //重用緩衝區
            serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            //自動調整下一次緩衝區建立時分配的空間大小,避免記憶體的浪費
            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
            //當伺服器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度,預設值50。
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //用於啟用或關於Nagle演算法。如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為true關閉Nagle演算法;如果要減少發送次數減少網路交互,就設置為false等累積一定大小後再發送。預設為false。
            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
            //是否啟用心跳保活機制
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            //支持tcp協議
            //bootstrap.childHandler(new TcpChannelInitializer());

            //支持webSocket協議
            serverBootstrap.childHandler(new WebSocketChannelInitializer());
            ChannelFuture f = serverBootstrap.bind(port).sync();
            if (f.isSuccess()) {
                logger.info("netty server start...");
            }
            //等到服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            //優雅釋放線程資源
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        }
    }

    /**
     * 銷毀netty相關資源
     */
    public void destroy() {
        try {
            if (boss != null) {
                boss.shutdownGracefully();
            }
            if (worker != null) {
                worker.shutdownGracefully();
            }
            if (serverBootstrap != null) {
                serverBootstrap = null;
            }
        } catch (Exception e) {
            logger.error("netty close err:" + e.getMessage(), e);
        }
    }
}
NettyServer.java

 

參考資料:《Netty IN ACTION》

演示源代碼:https://github.com/JMCuixy/NettyDemo


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 就是想記錄一下 JSP中傳參的四種方法: 1、form表單 2、request.setAttribute();和request.getAttribute(); 3、超鏈接:<a herf="index.jsp"?a=a&b=b&c=c>name</a> 4、<jsp:param> 發生Referen ...
  • python從2.6開始支持format,新的更加容易讀懂的字元串格式化方法, 從原來的% 模式變成新的可讀性更強的 綜合舉例說明: 輸入: '{:>18,.2f}'.format(70305084.0) # :冒號+空白填充+右對齊+固定寬度18+浮點精度.2+浮點數聲明f 輸出:' 70,305 ...
  • 原創 三羊獻瑞 觀察下麵的加法算式: 祥 瑞 生 輝 + 三 羊 獻 瑞 三 羊 生 瑞 氣 (如果有對齊問題,可以參看【圖1.jpg】) 其中,相同的漢字代表相同的數字,不同的漢字代表不同的數字。 請你填寫“三羊獻瑞”所代表的4位數字(答案唯一),不要填寫任何多餘內容。 分析:三羊生瑞氣這個數中三 ...
  • 13.1點擊更換圖形驗證碼 (1)front/signup.html (2)static/front/css/signup.css body { background: #f3f3f3; } .outer-box { width: 854px; background: #fff; margin: 0 ...
  • realloc 用方法 void realloc(void , n) 根據n的大小,如果n比較小,就沿用原來的記憶體地址(也就是返回的地址就是原來的地址),在原來地址的記憶體空間的最後面,加上n大小的記憶體空間;如果n比較大,系統就不會沿用原來的記憶體地址,系統有新開闢一個記憶體空間,並把原來記憶體空間里存放的 ...
  • Description 下麵的程式輸出結果是: A::Fun A::Do A::Fun C::Do 請填空: 程式代碼如下 ~~~~ include using namespace std; class A { private: int nVal; public: void Fun() { cout ...
  • 當我們瀏覽網頁的時候,經常會看到像下麵這些好看的圖片,你是否想把這些圖片保存下載下來。 我們最常規的做法就是通過滑鼠右鍵,選擇另存為。但有些圖片點擊滑鼠右鍵的時候並沒有另存為選項,或者你可以通過截圖工具截取下來,但這樣就降低圖片的清晰度,並且這樣效率很低。 那腫麽辦呢? 我們可以通過python 來 ...
  • Description 下麵程式的輸出結果是: destructor B destructor A 請完整寫出 class A。 限制條件:不得為 class A 編寫構造函數。 ~~~~ include using namespace std; class A { // Your Code Her ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...