手寫MQ框架(二)-服務端實現

来源:https://www.cnblogs.com/shuimutong/archive/2019/11/24/11923296.html
-Advertisement-
Play Games

一、起航 本著從無到有,從有到優的原則,所以計劃先通過web實現功能,然後再優化改寫為socket的形式。 1、關於技術選型 web框架使用了之前寫的gmvc框架(手寫MVC框架(一)-再出發),消息存儲採用存在資料庫的方式,使用的框架也是前段時間寫的gdao(手寫DAO框架(一)-從“1”開始 ) ...


一、起航

本著從無到有,從有到優的原則,所以計劃先通過web實現功能,然後再優化改寫為socket的形式。

1、關於技術選型

web框架使用了之前寫的gmvc框架(手寫MVC框架(一)-再出發),消息存儲採用存在資料庫的方式,使用的框架也是前段時間寫的gdao(手寫DAO框架(一)-從“1”開始 )。

2、項目搭建

項目本來是單項目的形式,但是考慮到將服務端、客戶端分開不是很友好,所以採用了maven父子模塊的形式。

其中,父pom配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.shuimutong</groupId>
    <artifactId>gmq</artifactId>
    <version>${global.version}</version>
    <packaging>pom</packaging>
    <url>http://maven.apache.org</url>

    <modules>
        <module>gmq-server</module>
        <module>gmq-client</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <global.version>0.0.1-SNAPSHOT</global.version>
        <fastjson.version>1.2.60</fastjson.version>
        <gdao.version>2.0.0-SNAPSHOT</gdao.version>
        <gmvc.version>1.0.1-SNAPSHOT</gmvc.version>
        <gutil.version>0.0.2-SNAPSHOT</gutil.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>me.lovegao</groupId>
                <artifactId>gdao</artifactId>
                <version>${gdao.version}</version>
            </dependency>
            <dependency>
                <groupId>com.shuimutong</groupId>
                <artifactId>gmvc</artifactId>
                <version>${gmvc.version}</version>
            </dependency>
            <dependency>
                <groupId>com.shuimutong</groupId>
                <artifactId>gutil</artifactId>
                <version>${gutil.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.4</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.6.2</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
View Code

這次要說的mq服務端pom配置如下:

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.shuimutong</groupId>
        <artifactId>gmq</artifactId>
        <version>${global.version}</version>
    </parent>
    <groupId>com.shuimutong</groupId>
    <artifactId>gmq-server</artifactId>
    <packaging>war</packaging>
    <version>${global.version}</version>
    <name>gmq-server</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>me.lovegao</groupId>
            <artifactId>gdao</artifactId>
        </dependency>
        <dependency>
            <groupId>com.shuimutong</groupId>
            <artifactId>gmvc</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>javax.servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
    </dependencies>
    <build>
        <finalName>com.shuimutong.gmq_server</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
View Code

 

 項目依賴的gdao(https://gitee.com/simpleha/gdao.git)、gmvc(https://gitee.com/simpleha/gmvc.git)、gutil(https://gitee.com/simpleha/gutil.git)需要先clone到本地並編譯到maven倉庫。

二、介面梳理

1、SyncController

在發佈消息或者訂閱消息前,我們需要先新增topic。我這裡把新增topic和後面的發佈訂閱消息分開了,主要是考慮到兩個類被訪問頻率有差別,分開後有利於以後的針對優化。

具體實現如下:

package com.shuimutong.gmq.server.controller;

import java.util.List;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.shuimutong.gmq.server.bean.enums.RequestParamEnum;
import com.shuimutong.gmq.server.bean.enums.ResponseCodeEnum;
import com.shuimutong.gmq.server.bean.vo.ResponseDataVo;
import com.shuimutong.gmq.server.bean.vo.UriDescVo;
import com.shuimutong.gmq.server.exception.ServiceException;
import com.shuimutong.gmq.server.service.SyncService;
import com.shuimutong.gmq.server.service.TopicService;
import com.shuimutong.gmvc.annotation.XAutowired;
import com.shuimutong.gmvc.annotation.XController;
import com.shuimutong.gmvc.annotation.XRequestMapping;
import com.shuimutong.gmvc.util.RequestResolveUtil;

/**
 * 非消息信息同步controller
 * @ClassName:  MessageController   
 * @Description:(這裡用一句話描述這個類的作用)   
 * @author: 水木桶
 * @date:   2019年10月20日 下午9:45:47     
 * @Copyright: 2019 [水木桶]  All rights reserved.
 */
@XController
@XRequestMapping("/sync")
public class SyncController {
    private final static Logger log = LoggerFactory.getLogger(SyncController.class);
    @XAutowired
    private TopicService topicService;
    @XAutowired
    private SyncService syncService;

    /**
     * 獲取uri說明
     * @param request
     * @param reponse
     */
    @XRequestMapping("/getPath")
    public void getPath(HttpServletRequest request, HttpServletResponse reponse) {
        List<UriDescVo> uriList = syncService.listUriDesc();
        ResponseDataVo responseData = new ResponseDataVo(ResponseCodeEnum.OK, uriList);
        RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
    }
    
    /**
     * 新增topic
     * @param request
     * @param reponse
     */
    @XRequestMapping("/addTopic")
    public void addTopic(HttpServletRequest request, HttpServletResponse reponse) {
        ResponseDataVo responseData = null;
        String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName());
        if(StringUtils.isBlank(topic)) {
            responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題為空");
        } else {
            try {
                boolean addState = topicService.addTopic(topic);
                if(addState) {
                    responseData = new ResponseDataVo(ResponseCodeEnum.OK, "添加成功");
                } else {
                    responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題已存在");
                }
            } catch (ServiceException e) {
                log.error("addTopicException," + topic, e);
                responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR);
            }
        }
        RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
    }
}

 

其中getPath()暫時沒有用到,以後用到再討論吧。

2、MessageController

消息的topic創建之後,就是消息的發佈和訂閱了。

具體實現如下:

package com.shuimutong.gmq.server.controller;

import java.util.List;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.shuimutong.gmq.server.bean.SystemConstant;
import com.shuimutong.gmq.server.bean.dos.TopicDo;
import com.shuimutong.gmq.server.bean.enums.RequestParamEnum;
import com.shuimutong.gmq.server.bean.enums.ResponseCodeEnum;
import com.shuimutong.gmq.server.bean.vo.ResponseDataVo;
import com.shuimutong.gmq.server.exception.ServiceException;
import com.shuimutong.gmq.server.service.MessageService;
import com.shuimutong.gmq.server.service.TopicService;
import com.shuimutong.gmvc.annotation.XAutowired;
import com.shuimutong.gmvc.annotation.XController;
import com.shuimutong.gmvc.annotation.XRequestMapping;
import com.shuimutong.gmvc.util.RequestResolveUtil;
import com.shuimutong.guti.bean.TwoTuple;

/**
 * 發消息、收消息controller
 * @ClassName:  MessageController   
 * @Description:(這裡用一句話描述這個類的作用)   
 * @author: 水木桶
 * @date:   2019年10月20日 下午9:45:47     
 * @Copyright: 2019 [水木桶]  All rights reserved.
 */
@XController
@XRequestMapping(SystemConstant.STR_URL_MESSAGE)
public class MessageController {
    private final static Logger log = LoggerFactory.getLogger(MessageController.class);
    @XAutowired
    private MessageService messageService;
    @XAutowired
    private TopicService topicService;
    

    /**
     * 獲取消息
     * @param request
     * @param reponse
     */
    @XRequestMapping(SystemConstant.STR_URL_GET_MESSAGE)
    public void getMessage(HttpServletRequest request, HttpServletResponse reponse) {
        ResponseDataVo responseData = null;
        String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName());
        String offsetStr = request.getParameter(RequestParamEnum.OFFSET.getParamName());
        String sizeStr = request.getParameter(RequestParamEnum.SIZE.getParamName());
        
        if(StringUtils.isBlank(topic) || !StringUtils.isNumeric(offsetStr) || !StringUtils.isNumeric(sizeStr)) {
            responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題為空或者數字非法");
        } else {
            int offset = Integer.parseInt(offsetStr);
            int size = Integer.parseInt(sizeStr);
            if(offset < 0 || size < 1) {
                responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "數字異常");
            } else {
                try {
                    TopicDo topicDo = topicService.findByTopic(topic);
                    if(topicDo == null) {
                        responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題不存在");
                    } else {
                        List<TwoTuple<Long, String>> list = messageService.listMessage(topic, offset, size);
                        responseData = new ResponseDataVo(ResponseCodeEnum.OK, list);
                    }
                } catch (ServiceException e) {
                    log.error("getMessageException", e);
                    responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR);
                }
            }
        }
        RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
    }
    
    
    /**
     * 生產方發送消息到服務端
     * @param request
     * @param reponse
     */
    @XRequestMapping(SystemConstant.STR_URL_SEND_MESSAGE)
    public void sendMessage(HttpServletRequest request, HttpServletResponse reponse) {
        ResponseDataVo responseData = null;
        String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName());
        String message = request.getParameter(RequestParamEnum.MESSAGE.getParamName());
        if(StringUtils.isBlank(topic) || StringUtils.isBlank(message)) {
            responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題或者消息為空");
        } else {
            try {
                TopicDo topicDo = topicService.findByTopic(topic);
                if(topicDo == null) {
                    responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題不存在");
                } else {
                    messageService.saveMessage(topic, message);
                    responseData = new ResponseDataVo(ResponseCodeEnum.OK);
                }
            } catch (ServiceException e) {
                log.error("sendMessageException", e);
                responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR);
            }
        }
        RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData));
    }
}

 

3、CacheController

設計的mq的消費形式是拉的形式。採用拉的形式,就需要客戶端自己去計數,消費到哪了。

所以這裡增加了這個緩存介面,提供kv存儲的功能。數據存儲在資料庫里。

代碼請移步到下文的gitee連接查看。

 

有了上面這幾個介面,就可以實現簡單的發佈消息、訂閱消息的功能了,當然是手動的方式獲取。

三、相關表

既然為了分享,資料就得提供全。所以這裡就列一下關聯的資料庫表結構。

1、主題表

CREATE TABLE `gmq_topic` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主題',
  `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '創建時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_idx_topic` (`topic`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='gmq-主題表';

 

2、消息表

CREATE TABLE `gmq_message` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主題',
  `message_body` text NOT NULL COMMENT '消息內容',
  `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '創建時間',
  PRIMARY KEY (`id`),
  KEY `idx_id_topic` (`id`,`topic`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='存儲的消息';

 

3、kv表

CREATE TABLE `gmq_message` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主題',
  `message_body` text NOT NULL COMMENT '消息內容',
  `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '創建時間',
  PRIMARY KEY (`id`),
  KEY `idx_id_topic` (`id`,`topic`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='存儲的消息';

 

四、總結

主要功能就是上面些。回頭看來也不算很多,但是具體實現過程中真是曲折重重,這其中一部分是框架的原因。

所以上篇博客到這篇博客中間,我不僅寫完了這個項目,其實還順便修複了框架使用中遇到的一些問題。比如bigint類型的數據查出來真的是BigDecimal,而不是long。

gmq項目主要包括兩塊,上面介紹的是服務端,接下來會介紹客戶端。

 

最後,附上代碼地址:https://gitee.com/simpleha/gmq.git

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文主要對Java IO相關知識點做了結構性梳理,包括了Java IO的作用,數據源File類,輸入流,輸出流,位元組流,字元流,以及緩衝流,不同場景下的更細化的流操作類型,同時用了一個文件拷貝代碼簡單地說明瞭主要的流操作 ...
  • 新聞 "相遇WebWindow,.NET Core上的跨平臺webview類庫" "使用Bolero在WebAssembly中運行F " "用於你團隊代碼庫的AI輔助IntelliSense" "Jupyter Notebook里的ML.NET" 視頻及幻燈片 "Monads" "使用React,E ...
  • FBV 基於函數的視圖 (function base views) CBV 基於類的視圖 (class base views) 也就是說我們是用函數編寫視圖~還是類編寫視圖~~我們來看下兩個的簡單實現~~ urlpatterns = [ path(‘admin/‘, admin.site.urls) ...
  • 一.hashlib(md5) 1 import hashlib 2 obj = hashlib.md5('dsfd'.encode('utf-8')) 3 obj.update('123'.encode('utf-8')) 4 print(obj.hexdigest()) 二.random 1.ra ...
  • verilog語言簡介 verilog語言是一種語法類似於c的語言,但是與c語言也有不同之處,比如: 1.verilog語言是並行的,每個always塊都是同時執行,而c語言是順序執行的 2.verilog又被稱作硬體描述語言,在用verilog語言編程的時候,不如說是在用verilog描述一段電路 ...
  • [TOC] 靜態文件 預設情況下所有的html文件都是放在templates文件夾內 什麼是靜態文件 網站所使用的提前寫的css、js 第三方的前端模塊、圖片都叫做靜態資源 預設情況下網站使用的靜態資源全部會放到static文件夾下 通常情況下 在static文件夾內部還會再建其他文件夾 這是為了更 ...
  • 目前主流的三種web服務交互方案: REST (Representational State Transfer) 表徵性狀態轉移 SOAP (Simple Object Access Protocol)簡單的對象訪問協議 XML RPC (XML Remote Procedure Call)基於XM ...
  • 一、背景 書接手寫MQ框架(二)-服務端實現 ,前面介紹了服務端的實現。但是具體使用框架過程中,用戶肯定是以客戶端的形式跟服務端打交道的。客戶端的好壞直接影響了框架使用的便利性。 雖然框架目前是通過web的形式提供功能的,但是某的目標其實是通過socket實現,所以不僅需要有客戶端,還要包裝一下,讓 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...