一、起航 本著從無到有,從有到優的原則,所以計劃先通過web實現功能,然後再優化改寫為socket的形式。 1、關於技術選型 web框架使用了之前寫的gmvc框架(手寫MVC框架(一)-再出發),消息存儲採用存在資料庫的方式,使用的框架也是前段時間寫的gdao(手寫DAO框架(一)-從“1”開始 ) ...
一、起航
本著從無到有,從有到優的原則,所以計劃先通過web實現功能,然後再優化改寫為socket的形式。
1、關於技術選型
web框架使用了之前寫的gmvc框架(手寫MVC框架(一)-再出發),消息存儲採用存在資料庫的方式,使用的框架也是前段時間寫的gdao(手寫DAO框架(一)-從“1”開始 )。
2、項目搭建
項目本來是單項目的形式,但是考慮到將服務端、客戶端分開不是很友好,所以採用了maven父子模塊的形式。
其中,父pom配置如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shuimutong</groupId> <artifactId>gmq</artifactId> <version>${global.version}</version> <packaging>pom</packaging> <url>http://maven.apache.org</url> <modules> <module>gmq-server</module> <module>gmq-client</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <global.version>0.0.1-SNAPSHOT</global.version> <fastjson.version>1.2.60</fastjson.version> <gdao.version>2.0.0-SNAPSHOT</gdao.version> <gmvc.version>1.0.1-SNAPSHOT</gmvc.version> <gutil.version>0.0.2-SNAPSHOT</gutil.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>me.lovegao</groupId> <artifactId>gdao</artifactId> <version>${gdao.version}</version> </dependency> <dependency> <groupId>com.shuimutong</groupId> <artifactId>gmvc</artifactId> <version>${gmvc.version}</version> </dependency> <dependency> <groupId>com.shuimutong</groupId> <artifactId>gutil</artifactId> <version>${gutil.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.2</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <target>1.8</target> <source>1.8</source> </configuration> </plugin> </plugins> </build> </project>View Code
這次要說的mq服務端pom配置如下:
<?xml version="1.0"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.shuimutong</groupId> <artifactId>gmq</artifactId> <version>${global.version}</version> </parent> <groupId>com.shuimutong</groupId> <artifactId>gmq-server</artifactId> <packaging>war</packaging> <version>${global.version}</version> <name>gmq-server</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>me.lovegao</groupId> <artifactId>gdao</artifactId> </dependency> <dependency> <groupId>com.shuimutong</groupId> <artifactId>gmvc</artifactId> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> </dependencies> <build> <finalName>com.shuimutong.gmq_server</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <target>1.8</target> <source>1.8</source> </configuration> </plugin> </plugins> </build> </project>View Code
項目依賴的gdao(https://gitee.com/simpleha/gdao.git)、gmvc(https://gitee.com/simpleha/gmvc.git)、gutil(https://gitee.com/simpleha/gutil.git)需要先clone到本地並編譯到maven倉庫。
二、介面梳理
1、SyncController
在發佈消息或者訂閱消息前,我們需要先新增topic。我這裡把新增topic和後面的發佈訂閱消息分開了,主要是考慮到兩個類被訪問頻率有差別,分開後有利於以後的針對優化。
具體實現如下:
package com.shuimutong.gmq.server.controller; import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.shuimutong.gmq.server.bean.enums.RequestParamEnum; import com.shuimutong.gmq.server.bean.enums.ResponseCodeEnum; import com.shuimutong.gmq.server.bean.vo.ResponseDataVo; import com.shuimutong.gmq.server.bean.vo.UriDescVo; import com.shuimutong.gmq.server.exception.ServiceException; import com.shuimutong.gmq.server.service.SyncService; import com.shuimutong.gmq.server.service.TopicService; import com.shuimutong.gmvc.annotation.XAutowired; import com.shuimutong.gmvc.annotation.XController; import com.shuimutong.gmvc.annotation.XRequestMapping; import com.shuimutong.gmvc.util.RequestResolveUtil; /** * 非消息信息同步controller * @ClassName: MessageController * @Description:(這裡用一句話描述這個類的作用) * @author: 水木桶 * @date: 2019年10月20日 下午9:45:47 * @Copyright: 2019 [水木桶] All rights reserved. */ @XController @XRequestMapping("/sync") public class SyncController { private final static Logger log = LoggerFactory.getLogger(SyncController.class); @XAutowired private TopicService topicService; @XAutowired private SyncService syncService; /** * 獲取uri說明 * @param request * @param reponse */ @XRequestMapping("/getPath") public void getPath(HttpServletRequest request, HttpServletResponse reponse) { List<UriDescVo> uriList = syncService.listUriDesc(); ResponseDataVo responseData = new ResponseDataVo(ResponseCodeEnum.OK, uriList); RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData)); } /** * 新增topic * @param request * @param reponse */ @XRequestMapping("/addTopic") public void addTopic(HttpServletRequest request, HttpServletResponse reponse) { ResponseDataVo responseData = null; String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName()); if(StringUtils.isBlank(topic)) { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題為空"); } else { try { boolean addState = topicService.addTopic(topic); if(addState) { responseData = new ResponseDataVo(ResponseCodeEnum.OK, "添加成功"); } else { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題已存在"); } } catch (ServiceException e) { log.error("addTopicException," + topic, e); responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR); } } RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData)); } }
其中getPath()暫時沒有用到,以後用到再討論吧。
2、MessageController
消息的topic創建之後,就是消息的發佈和訂閱了。
具體實現如下:
package com.shuimutong.gmq.server.controller; import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.shuimutong.gmq.server.bean.SystemConstant; import com.shuimutong.gmq.server.bean.dos.TopicDo; import com.shuimutong.gmq.server.bean.enums.RequestParamEnum; import com.shuimutong.gmq.server.bean.enums.ResponseCodeEnum; import com.shuimutong.gmq.server.bean.vo.ResponseDataVo; import com.shuimutong.gmq.server.exception.ServiceException; import com.shuimutong.gmq.server.service.MessageService; import com.shuimutong.gmq.server.service.TopicService; import com.shuimutong.gmvc.annotation.XAutowired; import com.shuimutong.gmvc.annotation.XController; import com.shuimutong.gmvc.annotation.XRequestMapping; import com.shuimutong.gmvc.util.RequestResolveUtil; import com.shuimutong.guti.bean.TwoTuple; /** * 發消息、收消息controller * @ClassName: MessageController * @Description:(這裡用一句話描述這個類的作用) * @author: 水木桶 * @date: 2019年10月20日 下午9:45:47 * @Copyright: 2019 [水木桶] All rights reserved. */ @XController @XRequestMapping(SystemConstant.STR_URL_MESSAGE) public class MessageController { private final static Logger log = LoggerFactory.getLogger(MessageController.class); @XAutowired private MessageService messageService; @XAutowired private TopicService topicService; /** * 獲取消息 * @param request * @param reponse */ @XRequestMapping(SystemConstant.STR_URL_GET_MESSAGE) public void getMessage(HttpServletRequest request, HttpServletResponse reponse) { ResponseDataVo responseData = null; String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName()); String offsetStr = request.getParameter(RequestParamEnum.OFFSET.getParamName()); String sizeStr = request.getParameter(RequestParamEnum.SIZE.getParamName()); if(StringUtils.isBlank(topic) || !StringUtils.isNumeric(offsetStr) || !StringUtils.isNumeric(sizeStr)) { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題為空或者數字非法"); } else { int offset = Integer.parseInt(offsetStr); int size = Integer.parseInt(sizeStr); if(offset < 0 || size < 1) { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "數字異常"); } else { try { TopicDo topicDo = topicService.findByTopic(topic); if(topicDo == null) { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題不存在"); } else { List<TwoTuple<Long, String>> list = messageService.listMessage(topic, offset, size); responseData = new ResponseDataVo(ResponseCodeEnum.OK, list); } } catch (ServiceException e) { log.error("getMessageException", e); responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR); } } } RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData)); } /** * 生產方發送消息到服務端 * @param request * @param reponse */ @XRequestMapping(SystemConstant.STR_URL_SEND_MESSAGE) public void sendMessage(HttpServletRequest request, HttpServletResponse reponse) { ResponseDataVo responseData = null; String topic = request.getParameter(RequestParamEnum.TOPIC.getParamName()); String message = request.getParameter(RequestParamEnum.MESSAGE.getParamName()); if(StringUtils.isBlank(topic) || StringUtils.isBlank(message)) { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題或者消息為空"); } else { try { TopicDo topicDo = topicService.findByTopic(topic); if(topicDo == null) { responseData = new ResponseDataVo(ResponseCodeEnum.PARAM_ERROR, "主題不存在"); } else { messageService.saveMessage(topic, message); responseData = new ResponseDataVo(ResponseCodeEnum.OK); } } catch (ServiceException e) { log.error("sendMessageException", e); responseData = new ResponseDataVo(ResponseCodeEnum.SERVER_ERROR); } } RequestResolveUtil.returnJson(request, reponse, JSONObject.toJSONString(responseData)); } }
3、CacheController
設計的mq的消費形式是拉的形式。採用拉的形式,就需要客戶端自己去計數,消費到哪了。
所以這裡增加了這個緩存介面,提供kv存儲的功能。數據存儲在資料庫里。
代碼請移步到下文的gitee連接查看。
有了上面這幾個介面,就可以實現簡單的發佈消息、訂閱消息的功能了,當然是手動的方式獲取。
三、相關表
既然為了分享,資料就得提供全。所以這裡就列一下關聯的資料庫表結構。
1、主題表
CREATE TABLE `gmq_topic` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵', `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主題', `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '創建時間', PRIMARY KEY (`id`), UNIQUE KEY `uniq_idx_topic` (`topic`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='gmq-主題表';
2、消息表
CREATE TABLE `gmq_message` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵', `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主題', `message_body` text NOT NULL COMMENT '消息內容', `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '創建時間', PRIMARY KEY (`id`), KEY `idx_id_topic` (`id`,`topic`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='存儲的消息';
3、kv表
CREATE TABLE `gmq_message` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵', `topic` varchar(255) NOT NULL DEFAULT '' COMMENT '主題', `message_body` text NOT NULL COMMENT '消息內容', `create_time` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '創建時間', PRIMARY KEY (`id`), KEY `idx_id_topic` (`id`,`topic`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='存儲的消息';
四、總結
主要功能就是上面些。回頭看來也不算很多,但是具體實現過程中真是曲折重重,這其中一部分是框架的原因。
所以上篇博客到這篇博客中間,我不僅寫完了這個項目,其實還順便修複了框架使用中遇到的一些問題。比如bigint類型的數據查出來真的是BigDecimal,而不是long。
gmq項目主要包括兩塊,上面介紹的是服務端,接下來會介紹客戶端。
最後,附上代碼地址:https://gitee.com/simpleha/gmq.git