手寫MQ框架(四)-使用netty改造梳理

来源:https://www.cnblogs.com/shuimutong/archive/2020/01/01/12129195.html
-Advertisement-
Play Games

新手應該怎樣使用netty?如果將http服務(不含頁面)改造為使用socket的服務? ...


一、背景

書接上文手寫MQ框架(三)-客戶端實現 ,前面通過web的形式實現了mq的服務端和客戶端,現在計劃使用netty來改造一下。前段時間學習了一下netty的使用(https://www.w3cschool.cn/netty4userguide/52ki1iey.html)。大概有一些想法。

netty封裝了socket的使用,我們通過簡單的調用即可構建高性能的網路應用。我計劃採用以下例子來對gmq進行改造。

本文主要參考:https://www.w3cschool.cn/netty4userguide/、https://www.w3cschool.cn/essential_netty_in_action/

二、netty是什麼

Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高性能、高可靠性的網路伺服器和客戶端程式。

--來自https://www.w3cschool.cn/netty4userguide/52ki1iey.html

netty是一個java框架,是網路編程框架,支持非同步、事件驅動的特性,所以性能表現很好。

 

三、netty的簡單實現

1、服務端

1)SimpleServerHandler

Handler是處理器,handler 是由 Netty 生成用來處理 I/O 事件的。

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class SimpleServerHandler extends SimpleChannelInboundHandler<String> {
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
        channels.remove(ctx.channel());
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("[" + incoming.remoteAddress() + "]" + s);
        if(s == null || s.length() == 0) {
            incoming.writeAndFlush("消息是空的呀!\n");
        } else {
//            MqRouter<?> mqRouter = JSONObject.parseObject(s, MqRouter.class);
//            System.out.println(mqRouter.getUri());
            String responseMsg = "收到了," + s + "\n";
            incoming.writeAndFlush(responseMsg);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"線上");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");
        
        cause.printStackTrace();
        ctx.close();
    }

}

2)SimpleServerInitializer

SimpleServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleServerHandler 等。

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleServerHandler());
        
        System.out.println("SimpleChatClient:" + ch.remoteAddress() + "連接上");
    }

}

 

3)SimpleServer

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleServer {
    private int port;

    public SimpleServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new SimpleServerInitializer()).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            System.out.println("SimpleChatServer 啟動了");

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("SimpleChatServer 關閉了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new SimpleServer(port).run();
    }
}

 

 2、客戶端

1)SimpleClientHandler

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class SimpleClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println("收到的信息:" + s);
    }

}

 

2)SimpleClientInitializer

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleClientHandler());
    }

}

 

3)SimpleClient

package me.lovegao.netty.learnw3c.mqdemo;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SimpleClient {
    private final String host;
    private final int port;
    
    public SimpleClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        new SimpleClient("localhost", 8080).run();
    }
    
    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer());
            Channel channel = bootstrap.connect(host, port).sync().channel();
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            while(true) {
                String line = in.readLine();
                if(line.equals("exit!")) {
                    break;
                }
                channel.writeAndFlush(line + "\r\n");
            }
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }



}

 

3、學習中的一些事

在我把教程上的代碼略微改了一下,測試時發現客戶端能夠發出消息,服務端能夠收到消息,服務端也走到了回覆客戶端的流程,但是客戶端卻收不到消息。還原代碼後是正常的,想了半天,最後才發現是改代碼的的時候漏掉了“\n”這個標識,以此導致客戶端始終不列印消息。

四、netty如何運用到gmq中

1、運用有什麼問題

netty只封裝了網路交互,gmq整體使用了gmvc框架,而gmvc框架目前還無法脫離servlet。而我又不太想把之前寫的代碼全部改為自己new的方式。

2、解決方式

1)改造gmvc框架

對gmvc框架進行重構,使得能夠脫離servlet使用。也就是將IOC功能剝離開。

優點:一步到位,符合整體的規劃。

缺點:gmq的迭代會延遲一段時間。

2)暫時拋棄gmvc框架

暫時將目前依賴的gmvc框架給去除掉,優先完成gmq的迭代。待後期gmvc框架改造完成後再進行改造。

優點:能夠儘早的完成gmq的功能。

缺點:先移除框架,後期再套上框架,相當於做了兩次多餘的功。費時費力。

3、結論

寫框架就是為了學習,寫GMVC、寫GMQ目的都一樣。時間寶貴,減少多餘功,先對GMVC框架進行改造。

4、一些其他事

運用netty還有一個事,就是路由的問題。使用netty代替servlet,需要解決路由的問題。

五、準備改造GMVC

敬請期待……


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 3.1 將元組(1,2,3) 和集合{"four",5,6}合成一個列表 1 tuple,set,list = (1,2,3),{"four",5,6},[] 2 for i in tuple: 3 list.append(i) 4 for j in set: 5 list.append(j) 6 ...
  • 繼承和派生 01 繼承和派生的概念 繼承: 在定義一個新的類 B 時,如果該類與某個已有的類 A 相似(指的是 B 擁有 A 的全部特點),那麼就可以把 A 作為一個基類,而把B作為基類的一個派生類(也稱子類)。 派生類: 派生類是通過對基類進行修改和擴充得到的,在派生類中,可以擴充新的成員變數和成 ...
  • 通過對Logback的AsyncAppender以及RollingFileAppender源碼進行解析,學習Logback對文件IO的操作細節 ...
  • 自定義的模版過濾器必須要放在app中,並且該app必須在INSTALLED_APPS中進行安裝。然後再在這個app下麵創建一個python包叫做templatetags(這個名字是固定的,不能隨意更改)。再在這個包下麵創建一個python文件。然後在這個文件中寫過濾器。 過濾器實際上就是python ...
  • python3.5.4 遞歸函數最噁心的時候莫非棧溢出(Stack overflow)。PS:另外很多人在學習Python的過程中,往往因為沒有好的教程或者沒人指導從而導致自己容易放棄,為此我建了個Python交流.裙 :一久武其而而流一思(數字的諧音)轉換下可以找到了,裡面有最新Python教程項 ...
  • 本系列筆記主要基於《深入理解Java虛擬機:JVM高級特性與最佳實踐 第2版》,是這本書的讀書筆記。 Java虛擬機的記憶體區域中,程式計數器、Java棧和本地方法棧是線程私有的,隨線程而生隨線程而滅,因此這幾個區域的記憶體回收和分配都有確定性,所以主要探究的是Java堆和方法區的記憶體分配及回收。 Ja ...
  • Markdown MarkDown用法/註意事項備忘 本文用於記錄Markdown的編寫規範與心得,包含vscode相關的配置。原文是用markdown格式寫的,稍微改了下發了博客,格式可能會很奇怪。。 Markdown是一種輕量級的標記語言。標記語言(Markup Language)是一種將文本以 ...
  • 這個實例主要給大家介紹如何使用jQuery+PHP+MySQL來實現線上測試題,包括動態讀取題目,答題完畢後臺評分,並返回答題結果。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...