手寫MQ框架(三)-客戶端實現

来源:https://www.cnblogs.com/shuimutong/archive/2019/11/24/11923420.html
-Advertisement-
Play Games

一、背景 書接手寫MQ框架(二)-服務端實現 ,前面介紹了服務端的實現。但是具體使用框架過程中,用戶肯定是以客戶端的形式跟服務端打交道的。客戶端的好壞直接影響了框架使用的便利性。 雖然框架目前是通過web的形式提供功能的,但是某的目標其實是通過socket實現,所以不僅需要有客戶端,還要包裝一下,讓 ...


一、背景

書接手寫MQ框架(二)-服務端實現  ,前面介紹了服務端的實現。但是具體使用框架過程中,用戶肯定是以客戶端的形式跟服務端打交道的。客戶端的好壞直接影響了框架使用的便利性。

雖然框架目前是通過web的形式提供功能的,但是某的目標其實是通過socket實現,所以不僅需要有客戶端,還要包裝一下,讓用戶在使用過程中不需要關心服務端是如何實現的。

簡單來說,就是客戶端使用必須方便。

二、客戶端實現

1、HttpUtil

目前客戶端的核心功能是HttpUtil這個類,使用httpClient實現的,主要是為了請求服務端。

具體實現如下:

package com.shuimutong.gmq.client.util;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shuimutong.gmq.client.bean.HttpResponseBean;
import com.shuimutong.gutil.common.GUtilCommonUtil;

/**
 * http請求工具類
 * @ClassName: HttpUtil
 * @Description:(這裡用一句話描述這個類的作用)
 * @author: 水木桶
 * @date: 2019年10月29日 下午9:43:54
 * @Copyright: 2019 [水木桶] All rights reserved.
 */
public class HttpUtil {
    private final static Logger log = LoggerFactory.getLogger(HttpUtil.class);
    private static CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal();
    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    HTTP_CLIENT.close();
                } catch (IOException e) {
                    log.error("HTTP_CLIENT-closeException", e);
                }
            }
        });
    }

    /**
     * get請求
     * 
     * @param url
     * @return
     * @throws IOException
     */
    public static HttpResponseBean get(String url) throws IOException {
        HttpResponseBean responseBean = null;
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
        try {
            HttpEntity httpEntity = res.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(res.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            res.close();
        }
        return responseBean;
    }
    
    /**
     * 帶參數的get請求
     * @param url
     * @param requsetParams
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static HttpResponseBean get(String url, Map<String, String> requsetParams) throws IOException {
        HttpResponseBean responseBean = null;
        HttpGet httpGet;
        try {
            URIBuilder uriBuilder = new URIBuilder(url);
            if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
                List<NameValuePair> nvps = new ArrayList<NameValuePair>();
                requsetParams.forEach((k,v) -> {
                    nvps.add(new BasicNameValuePair(k, v));
                });
                uriBuilder.setParameters(nvps);
            }
            httpGet = new HttpGet(uriBuilder.build());
        } catch (Exception e) {
            throw new IOException(e);
        }
        CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
        try {
            HttpEntity httpEntity = res.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(res.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            res.close();
        }
        return responseBean;
    }

    /**
     * post請求
     * @param url
     * @param requsetParams
     * @return
     * @throws IOException
     */
    public static HttpResponseBean post(String url, Map<String, String> requsetParams) throws IOException {
        HttpResponseBean responseBean = null;
        HttpPost httpPost = new HttpPost(url);
        if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
            List<NameValuePair> nvps = new ArrayList<NameValuePair>();
            requsetParams.forEach((k,v) -> {
                nvps.add(new BasicNameValuePair(k, v));
            });
            httpPost.setEntity(new UrlEncodedFormEntity(nvps));
        }
        CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost);
        try {
            HttpEntity httpEntity = response.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(response.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            response.close();
        }
        return responseBean;
    }
}

 

封裝了get請求和post請求,封裝了響應結果。

加了一個鉤子,在jvm關閉時能夠主動關閉創建的資源。

2、訂閱消息、生產消息

這兩部分主要就是調用上面的HttpUtil,然後將結果包裝一下。

具體代碼請參考前文的git。

3、實例管理

為了使得用戶不需要關心具體實現,所以建了實例管理類。

package com.shuimutong.gmq.client.util;

import com.shuimutong.gmq.client.cache.CommonObjCache;
import com.shuimutong.gmq.client.cache.impl.CommonObjCacheImpl;
import com.shuimutong.gmq.client.consumer.GmqConsumer;
import com.shuimutong.gmq.client.producer.GmqProducer;

public class GmqInstanceManage {
    public static GmqProducer getGmqProducer(String gmqServerUrl) {
        return new GmqProducer(gmqServerUrl);
    }
    
    public static GmqConsumer getGmqConsumer(String gmqServerUrl) {
        return new GmqConsumer(gmqServerUrl);
    }
    
    public static CommonObjCache getCommonCache(String serverUrl) {
        return new CommonObjCacheImpl(serverUrl);
    }
}

 

主要是為了封裝變化。因為之後再迭代的話,實例的具體實現肯定不是目前這麼簡單,所以要儘量讓使用者少關心具體實現。

使用時關心的越多,後續項目迭代肯定越困難。

三、使用示例

1、生產消息

@Test
    public void produceMsg() {
        GmqProducer producer = GmqInstanceManage.getGmqProducer(gmqServerUrl);
        for(int i=0; i<5; i++) {
            String message = "message:" + i;
            try {
                SendMqResult res = producer.sendMq(topic, message);
                System.out.println(res.getRes());
            } catch (SendMqException e) {
                e.printStackTrace();
            }
        }
    }

 

2、消費消息

主要思路是:消費消息之前,先查詢當前已經消費到了哪條消息。消息消費之後,將消費的編號存入緩存。

典型的主動拉消息,消息是否消費由自己負責的模式。

實現如下:

@Test
    public void comsumerMsgByCache() {
        GmqConsumer comsumer = GmqInstanceManage.getGmqConsumer(gmqServerUrl);
        CommonObjCache commonCache = GmqInstanceManage.getCommonCache(gmqServerUrl);
        String gmqSign = "gmq_consumer_id";
        long consumerId = 0;
        int size = 2;
        for(int i=0; i<5; i++) {
            try {
                CacheObj cacheId = commonCache.getById(gmqSign);
                if(cacheId != null) {
                    consumerId = Long.parseLong(cacheId.getContent());
                }
                
                List<MqContent> res = comsumer.getMq(topic, consumerId, size);
                for(MqContent mq : res) {
                    System.out.println(JSONObject.toJSONString(mq));
                    if(mq.getId() > consumerId) {
                        consumerId = mq.getId();
                    }
                }
                commonCache.save(gmqSign, String.valueOf(consumerId));
                System.out.println("保存consumerId:" + consumerId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

 

四、總結

gmq的初版至今已經完成,當然這隻是開始。

後續計劃先將gmvc框架替換掉,直接使用netty進行通信。

然後把消息存到資料庫改為存到磁碟上。

然後就是服務的高可用改造。

屆時歡迎指導。

第2版設計、開發中……

 

 

 

 

 

 

 

 

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我們在進行項目的多環境配置時,有很多種方式供我們選擇,比如 SpringBoot 自帶的 application dev.yml、maven 的 profile 等。這裡介紹的就是如何利用 profile 進行多環境配置。 首先,在 pom.xml 中添加你需要的 profile 配置: profi ...
  • 本文主要對Java IO相關知識點做了結構性梳理,包括了Java IO的作用,數據源File類,輸入流,輸出流,位元組流,字元流,以及緩衝流,不同場景下的更細化的流操作類型,同時用了一個文件拷貝代碼簡單地說明瞭主要的流操作 ...
  • 新聞 "相遇WebWindow,.NET Core上的跨平臺webview類庫" "使用Bolero在WebAssembly中運行F " "用於你團隊代碼庫的AI輔助IntelliSense" "Jupyter Notebook里的ML.NET" 視頻及幻燈片 "Monads" "使用React,E ...
  • FBV 基於函數的視圖 (function base views) CBV 基於類的視圖 (class base views) 也就是說我們是用函數編寫視圖~還是類編寫視圖~~我們來看下兩個的簡單實現~~ urlpatterns = [ path(‘admin/‘, admin.site.urls) ...
  • 一.hashlib(md5) 1 import hashlib 2 obj = hashlib.md5('dsfd'.encode('utf-8')) 3 obj.update('123'.encode('utf-8')) 4 print(obj.hexdigest()) 二.random 1.ra ...
  • verilog語言簡介 verilog語言是一種語法類似於c的語言,但是與c語言也有不同之處,比如: 1.verilog語言是並行的,每個always塊都是同時執行,而c語言是順序執行的 2.verilog又被稱作硬體描述語言,在用verilog語言編程的時候,不如說是在用verilog描述一段電路 ...
  • [TOC] 靜態文件 預設情況下所有的html文件都是放在templates文件夾內 什麼是靜態文件 網站所使用的提前寫的css、js 第三方的前端模塊、圖片都叫做靜態資源 預設情況下網站使用的靜態資源全部會放到static文件夾下 通常情況下 在static文件夾內部還會再建其他文件夾 這是為了更 ...
  • 目前主流的三種web服務交互方案: REST (Representational State Transfer) 表徵性狀態轉移 SOAP (Simple Object Access Protocol)簡單的對象訪問協議 XML RPC (XML Remote Procedure Call)基於XM ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...