Netty實現客戶端和服務端通信簡單例子

来源:https://www.cnblogs.com/kingsonfu/archive/2018/03/23/Java.html
-Advertisement-
Play Games

Netty是建立在NIO基礎之上,Netty在NIO之上又提供了更高層次的抽象。 在Netty裡面,Accept連接可以使用單獨的線程池去處理,讀寫操作又是另外的線程池來處理。 Accept連接和讀寫操作也可以使用同一個線程池來進行處理。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫 ...


Netty是建立在NIO基礎之上,Netty在NIO之上又提供了更高層次的抽象。

在Netty裡面,Accept連接可以使用單獨的線程池去處理,讀寫操作又是另外的線程池來處理。

Accept連接和讀寫操作也可以使用同一個線程池來進行處理。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫線程一塊處理。線程池中的每一個線程都是NIO線程。用戶可以根據實際情況進行組裝,構造出滿足系統需求的併發模型。

Netty提供了內置的常用編解碼器,包括行編解碼器[一行一個請求],首碼長度編解碼器[前N個位元組定義請求的位元組長度],可重放解碼器[記錄半包消息的狀態],HTTP編解碼器,WebSocket消息編解碼器等等

Netty提供了一些列生命周期回調介面,當一個完整的請求到達時,當一個連接關閉時,當一個連接建立時,用戶都會收到回調事件,然後進行邏輯處理。

Netty可以同時管理多個埠,可以使用NIO客戶端模型,這些對於RPC服務是很有必要的。

Netty除了可以處理TCP Socket之外,還可以處理UDP Socket。

在消息讀寫過程中,需要大量使用ByteBuffer,Netty對ByteBuffer在性能和使用的便捷性上都進行了優化和抽象。

代碼:

服務端:

package com.kinson.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * descripiton:服務端
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 15:37
 * @modifier:
 * @since:
 */
public class NettyServer {
    /**
     * 埠
     */
    private int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void run() {
        //EventLoopGroup是用來處理IO操作的多線程事件迴圈器
        //負責接收客戶端連接線程
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //負責處理客戶端i/o事件、task任務、監聽任務組
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //啟動 NIO 服務的輔助啟動類
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup);
        //配置 Channel
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ServerIniterHandler());
        //BACKLOG用於構造服務端套接字ServerSocket對象,
        // 標識當伺服器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        //是否啟用心跳保活機制
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            //綁定服務埠監聽
            Channel channel = bootstrap.bind(port).sync().channel();
            System.out.println("server run in port " + port);
            //伺服器關閉監聽
            /*channel.closeFuture().sync()實際是如何工作:
            channel.closeFuture()不做任何操作,只是簡單的返回channel對象中的closeFuture對象,對於每個Channel對象,都會有唯一的一個CloseFuture,用來表示關閉的Future,
            所有執行channel.closeFuture().sync()就是執行的CloseFuturn的sync方法,從上面的解釋可以知道,這步是會將當前線程阻塞在CloseFuture上*/
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //關閉事件流組
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyServer(8899).run();
    }
}

服務端業務邏輯處理:

package com.kinson.netty.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * descripiton: 伺服器的處理邏輯
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 15:50
 * @modifier:
 * @since:
 */
public class ServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 所有的活動用戶
     */
    public static final ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 讀取消息通道
     *
     * @param context
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext context, String s)
            throws Exception {
        Channel channel = context.channel();
        //當有用戶發送消息的時候,對其他的用戶發送消息
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[you]: " + s + "\n");
            } else {
                ch.writeAndFlush("[" + channel.remoteAddress() + "]: " + s + "\n");
            }
        }
        System.out.println("[" + channel.remoteAddress() + "]: " + s + "\n");
    }

    /**
     * 處理新加的消息通道
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[" + channel.remoteAddress() + "] coming");
            }
        }
        group.add(channel);
    }

    /**
     * 處理退出消息通道
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[" + channel.remoteAddress() + "] leaving");
            }
        }
        group.remove(channel);
    }

    /**
     * 在建立連接時發送消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        boolean active = channel.isActive();
        if (active) {
            System.out.println("[" + channel.remoteAddress() + "] is online");
        } else {
            System.out.println("[" + channel.remoteAddress() + "] is offline");
        }
        ctx.writeAndFlush("[server]: welcome");
    }

    /**
     * 退出時發送消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        if (!channel.isActive()) {
            System.out.println("[" + channel.remoteAddress() + "] is offline");
        } else {
            System.out.println("[" + channel.remoteAddress() + "] is online");
        }
    }

    /**
     * 異常捕獲
     *
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        Channel channel = ctx.channel();
        System.out.println("[" + channel.remoteAddress() + "] leave the room");
        ctx.close().sync();
    }

}

服務端處理器註冊:

package com.kinson.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


/**
 * descripiton: 伺服器初始化
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 15:46
 * @modifier:
 * @since:
 */
public class ServerIniterHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //管道註冊handler
        ChannelPipeline pipeline = socketChannel.pipeline();
        //編碼通道處理
        pipeline.addLast("decode", new StringDecoder());
        //轉碼通道處理
        pipeline.addLast("encode", new StringEncoder());
        //聊天服務通道處理
        pipeline.addLast("chat", new ServerHandler());
    }
}

客戶端:

package com.kinson.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * descripiton: 客戶端
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 16:40
 * @modifier:
 * @since:
 */
public class NettyClient {

    private String ip;

    private int port;

    private boolean stop = false;

    public NettyClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void run() throws IOException {
        //設置一個多線程迴圈器
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //啟動附註類
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup);
        //指定所使用的NIO傳輸channel
        bootstrap.channel(NioSocketChannel.class);
        //指定客戶端初始化處理
        bootstrap.handler(new ClientIniterHandler());
        try {
            //連接服務
            Channel channel = bootstrap.connect(ip, port).sync().channel();
            while (true) {
                //向服務端發送內容
                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                String content = reader.readLine();
                if (StringUtils.isNotEmpty(content)) {
                    if (StringUtils.equalsIgnoreCase(content, "q")) {
                        System.exit(1);
                    }
                    channel.writeAndFlush(content);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(1);
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyClient("127.0.0.1", 8899).run();
    }
}

客戶端邏輯處理:

package com.kinson.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * descripiton: 客戶端邏輯處理
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 16:50
 * @modifier:
 * @since:
 */
public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        //列印服務端的發送數據
        System.out.println(s);
    }
}

客戶端處理器註冊:

package com.kinson.netty.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * descripiton: 客戶端處理初始化
 *
 * @author: www.iknowba.cn
 * @date: 2018/3/23
 * @time: 16:55
 * @modifier:
 * @since:
 */
public class ClientIniterHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //註冊管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("http", new HttpClientCodec());
        pipeline.addLast("chat", new ClientHandler());
    }
}

測試時先啟動服務端,再啟動客戶端。。。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、二叉樹就是這麼簡單 本文撇開一些非常苦澀、難以理解的概念來講講二叉樹,僅入門觀看(或複習).... 首先,我們來講講什麼是樹: 樹是一種 非線性 的數據結構,相對於線性的數據結構(鏈表、數組)而言, 樹的平均運行時間更短(往往與樹相關的排序時間複雜度都不會高) 在現實生活中,我們一般的樹長這個樣 ...
  • 本文內容: 抽象類 介面 抽象類與介面的異同 首發日期:2018-03-24 抽象類: 雖然已經有了父類,但有時候父類也是無法直接描述某些共有屬性的,比如哺乳類和人類都會叫,而一般來說哺乳類這個父類並沒有準確定義“叫”的屬性的,顯然應該由子類來決定怎麼“叫”,但“叫”這個屬性是共有的,那麼可以把這個 ...
  • Java long數據類型 位有符號的Java原始數據類型。當對整數的計算結果可能超出int數據類型的範圍時使用。 位有符號的Java原始數據類型。當對整數的計算結果可能超出int數據類型的範圍時使用。 long數據類型範圍是-9,223,372,036,854,775,808至9,223,372, ...
  • 從這篇文章開始,來記錄我的Django學習過程。 Day-1 一 首先是安裝Django版本的選擇,我所選擇的是Django1.11。為什麼選擇1.11呢,因為它是長期技術支持版(LTS),官方會加上新特性,和修複一些BUG。 好了,版本選擇好了,該安裝了,在我這兒安裝有兩種方法: 1. 打開CMD ...
  • 首先回憶下以前學的函數重載 函數重載 函數重載的本質為相互獨立的不同函數 通過函數名和函數參數來確定函數調用 無法直接通過函數名得到重載函數的入口地址 函數重載必然發生在同一個作用域中 類中的函數重載 靜態成員函數能與普通成員函數建立重載關係 全局函數和成員函數不能構成重載關係 操作符重載(oper ...
  • 二叉樹作為的基本數據結構,應用廣泛,在生活中處處可見,而遍歷二叉樹在二叉樹應用中十分常見。與線性存儲結構不同,二叉樹每個節點都有可能有兩棵子樹,從二叉樹的存儲結構可知: 根節點、左子樹、右子樹——二叉樹的基本組成單位。那麼,根據的遞歸的思想(數據結構嚴蔚敏版):當一個複雜的問題可以分解成若幹子問題來 ...
  • 對象模型示例: ![][1] 繼承映射的實現方式有以下三種: (一)每棵類繼承樹一張表 (二)每個類一張表 (三)每個子類一張表 (一)每棵類繼承樹一張表 關係模型如下: ![][2] 映射文件如下: 說明: 因為類繼承樹肯定是對應多個類,要把多個類的信息存放在一張表中,必須有某種機制來區分哪些記錄 ...
  • 最近項目中有個需求,需要從一個List中,抽取其中一個項目,生成一個新的List。下麵是實現代碼。 method = (tClass.newInstance()).getClass().getMethod(methodName); 有一點需要註意取得方法名的時候,需要先生成T的一個實例,tClass ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...