使用java實現阿裡雲消息隊列簡單封裝

来源:https://www.cnblogs.com/kuangdaoyizhimei/archive/2018/03/06/8508357.html
-Advertisement-
Play Games

一、前言 最近公司有使用阿裡雲消息隊列的需求,為了更加方便使用,本人用了幾天時間將消息隊列封裝成api調用方式以方便內部系統的調用,現在已經完成,特此記錄其中過程和使用到的相關技術,與君共勉。 現在阿裡雲提供了兩種消息服務:mns服務和ons服務,其中我認為mns是簡化版的ons,而且mns的消息消 ...


一、前言

最近公司有使用阿裡雲消息隊列的需求,為了更加方便使用,本人用了幾天時間將消息隊列封裝成api調用方式以方便內部系統的調用,現在已經完成,特此記錄其中過程和使用到的相關技術,與君共勉。

現在阿裡雲提供了兩種消息服務:mns服務和ons服務,其中我認為mns是簡化版的ons,而且mns的消息消費需要自定義輪詢策略的,相比之下,ons的發佈與訂閱模式功能更加強大(比如相對於mns,ons提供了消息追蹤、日誌、監控等功能),其api使用起來更加方便,而且聽聞阿裡內部以後不再對mns進行新的開發,只做維護,ons服務則會逐步替代mns服務成為阿裡消息服務的主打產品,所以,如果有使用消息隊列的需求,建議不要再使用mns,使用ons是最好的選擇。

參考文檔:https://m.aliyun.com/doc/product/29530.html

涉及到的技術:Spring,反射、動態代理、Jackson序列化和反序列化

在看下麵的文章之前,需要先看上面的文檔以瞭解相關概念(Topic、Consumer、Producer、Tag等)以及文檔中提供的簡單的發送和接收代碼實現。

該博文只針對有消息隊列知識基礎的朋友看,能幫上大家的忙我自然很高興,看不懂的也不要罵,說明你路子不對。

 

二、設計方案

1.消息發送

在一個簡單的cs架構中,假設server會監聽一個Topic的Producer發送的消息,那麼它首先應該提供client一個api,client只需要簡單的調用該api,就可以通過producer來生產消息

2.消息接收

由於api是server制定的,所以server當然也知道如何消費這些消息

在這個過程中,server實際充當著消費者的角色,client實際充當著生產者的角色,但是生產者生產消息的規則則由消費者制定以滿足消費者消費需求。

3.最終目標

我們要創建一個單獨的jar包,起名為queue-core為生產者和消費者提供依賴和發佈訂閱的具體實現。

 

三、消息發送

1.消費者提供介面

@Topic(name="kdyzm",producerId="kdyzm_producer")
public interface UserQueueResource {
    
    @Tag("test1")
    public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user);
    
    @Tag("test2")
    public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);
}

由於Topic和producer之間是N:1的關係,所以這裡直接將producerId作為Topic的一個屬性;Tag是一個很關鍵的過濾條件,消費者通過它進行消息的分類做不同的業務處理,所以,這裡使用Tag作為路由條件。

2.生產者使用消費者提供的api發送消息

 由於消費者只提供了介面給生產者使用,介面是沒有辦法直接使用的,因為沒有辦法實例化,這裡使用動態代理生成對象,在消費者提供的api中,添加如下config,以方便生產者直接導入config即可使用,這裡使用了基於java的spring config,請知悉。

@Configuration
public class QueueConfig {

    @Autowired
    @Bean
    public UserQueueResource userQueueResource() {
        return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class);
    }
}

3.queue-core對生產者發送消息的封裝

以上1中所有的註解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory類都要在queue-core中定義,其中註解的定義只是定義了規則,真正的實現實際上是在QueueResourceFactory中

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.wy.queue.core.api.MQConnection;
import com.wy.queue.core.utils.JacksonSerializer;
import com.wy.queue.core.utils.MQUtils;
import com.wy.queue.core.utils.QueueCoreSpringUtils;

public class QueueResourceFactory implements InvocationHandler {

    private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class);
    
    private String topicName;

    private String producerId;
    
    private JacksonSerializer serializer=new JacksonSerializer();
    
    private static final String PREFIX="PID_";
    
    public QueueResourceFactory(String topicName,String producerId) {
        this.topicName = topicName;
        this.producerId=producerId;
    }

    public static <T> T createProxyQueueResource(Class<T> clazz) {
        String topicName = MQUtils.getTopicName(clazz);
        String producerId = MQUtils.getProducerId(clazz);
        T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(),
                new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId));
        return target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if(args.length == 0 || args.length>1){
            throw new RuntimeException("only accept one param at queueResource interface.");
        }
        String tagName=MQUtils.getTagName(method);
        ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class);
        MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
        
        Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId);
        
        //發送消息
        Message msg = new Message( //
                // 在控制台創建的 Topic,即該消息所屬的 Topic 名稱
                connectionInfo.getPrefix()+"_"+topicName,
                // Message Tag,
                // 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
                tagName,
                // Message Body
                // 任何二進位形式的數據, MQ 不做任何干預,
                // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                serializer.serialize(args[0]).getBytes());
        SendResult sendResult = producer.send(msg);
        logger.info("Send Message success. Message ID is: " + sendResult.getMessageId());
        return null;
    }
    
}

這裡特意將自定義包和第三方使用的包名都貼過來了,以便於區分。

這裡到底做了哪些事情呢?

發送消息的過程就是動態代理創建一個代理對象,該對象調用方法的時候會被攔截,首先解析所有的註解,比如topicName、producerId、tag等關鍵信息從註解中取出來,然後調用阿裡sdk發送消息,過程很簡單,但是註意,這裡發送消息的時候是分環境的,一般來講現在企業中會區分QA、staging、product三種環境,其中QA和staging是測試環境,對於消息隊列來講,也是會有三種環境的,但是QA和staging環境往往為了降低成本使用同一個阿裡賬號,所以創建的topic和productId會放到同一個區域下,這樣同名的TopicName是不允許存在的,所以加上了環境首碼加以區分,比如QA_TopicName,PID_Staging_ProducerId等等;另外,queue-core提供了MQConnection介面,以獲取配置信息,生產者服務只需要實現該介面即可。

 

4.生產者發送消息

    @Autowired
    private UserQueueResource userQueueResource;
    
    @Override
    public void sendMessage() {
        UserModel userModel=new UserModel();
        userModel.setName("kdyzm");
        userModel.setAge(25);
        userQueueResource.handleUserInfo(userModel);
    }

只需要數行代碼即可將消息發送到指定的Topic,相對於原生的發送代碼,精簡了太多。

四、消息消費

 相對於消息發送,消息的消費要複雜一些。

1.消息消費設計

由於Topic和Consumer之間是N:N的關係,所以將ConsumerId放到消費者具體實現的方法上

@Controller
@QueueResource
public class UserQueueResourceImpl implements UserQueueResource {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    @ConsumerAnnotation("kdyzm_consumer")
    public void handleUserInfo(UserModel user) {
        logger.info("收到消息1:{}", new Gson().toJson(user));
    }

    @Override
    @ConsumerAnnotation("kdyzm_consumer1")
    public void handleUserInfo1(UserModel user) {
        logger.info("收到消息2:{}", new Gson().toJson(user));
    }

}

這裡又有兩個新的註解@QueueResource和@ConsumerAnnotation,這兩個註解後續會討論如何使用。有人會問我為什麼要使用ConsumerAnnotation這個名字而不使用Consumer這個名字,因為Consumer這個名字和aliyun提供的sdk中的名字衝突了。。。。

在這裡, 消費者提供api 介面給生產者以方便生產者發送消息,消費者則實現該介面以消費生產者發送的消息,如何實現api介面就實現了監聽,這點是比較關鍵的邏輯。

2.queue-core實現消息隊列監聽核心邏輯

第一步:使用sping 容器的監聽方法獲取所有加上QueueResource註解的Bean

第二步:分發處理Bean

如何處理這些Bean呢,每個Bean實際上都是一個對象,有了對象,比如上面例子中的UserQueueResourceImpl 對象,我們可以拿到該對象實現的介面位元組碼對象,進而可以拿到該介面UserQueueRerousce上的註解以及方法上和方法中的註解,當然UserQueueResourceImpl實現方法上的註解也能拿得到,這裡我將獲取到的信息以consumerId為key,其餘相關信息封裝為Value緩存到了一個Map對象中,核心代碼如下:

Class<?> clazz = resourceImpl.getClass();
        Class<?> clazzIf = clazz.getInterfaces()[0];
        Method[] methods = clazz.getMethods();
        String topicName = MQUtils.getTopicName(clazzIf);
        for (Method m : methods) {
            ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class);

            if (null == consumerAnno) {
//                logger.error("method={} need Consumer annotation.", m.getName());
                continue;
            }
            String consuerId = consumerAnno.value();
            if (StringUtils.isEmpty(consuerId)) {
                logger.error("method={} ConsumerId can't be null", m.getName());
                continue;
            }
            Class<?>[] parameterTypes = m.getParameterTypes();
            Method resourceIfMethod = null;
            try {
                resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes);
            } catch (NoSuchMethodException | SecurityException e) {
                logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(),
                        e);
                continue;
            }
            String tagName = MQUtils.getTagName(resourceIfMethod);
            consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m));
        }

第三步:通過反射實現消費的動作

首先,先確定好反射動作執行的時機,那就是監聽到了新的消息

其次,如何執行反射動作?不贅述,有反射相關基礎的童鞋都知道怎麼做,核心代碼如下所示:

MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);
        String topicPrefix=connectionInfo.getPrefix()+"_";
        String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_";
        for(String consumerId:consumersMap.keySet()){
            MethodInfo methodInfo=consumersMap.get(consumerId);
            Properties connectionProperties=convertToProperties(connectionInfo);
            // 您在控制台創建的 Consumer ID
            connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId);
            Consumer consumer = ONSFactory.createConsumer(connectionProperties);
            consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //訂閱多個Tag
                public Action consume(Message message, ConsumeContext context) {
                    try {
                        String messageBody=new String(message.getBody(),"UTF-8");
                        logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody);
                        Method method=methodInfo.getMethod();
                        Class<?> parameType = method.getParameterTypes()[0];
                        Object arg = jacksonSerializer.deserialize(messageBody, parameType);
                        Object[] args={arg};
                        method.invoke(resourceImpl, args);
                    } catch (Exception e) {
                        logger.error("",e);
                    }
                    return Action.CommitMessage;
                }
            });
            consumer.start();
            logger.info("consumer={} has started.",consumerIdPrefix+consumerId);
        }

五、完整代碼見下麵的git鏈接

 https://github.com/kdyzm/queue-core.git


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Flask是一個輕量級的Web服務程式,它簡單、易用、靈活,這裡主要用來做一些API服務。 1. 相關鏈接 GitHub:https://github.com/pallets/flask 官方文檔:http://flask.pocoo.org 中文文檔:http://docs.jinkan.org/ ...
  • 2018-03-07 一、什麼是變數 變數的定義是: 用來命名一個數據的標示符1949 這是一個數字,代表某年如果要命名這個數字,在java里就會寫成: int 是數據類型,表示是整數year 是一個標識符= 是賦值操作符1949 是一個數字類型的值; 表示該行結束year 這個標識符就是一個變數, ...
  • 在網上找個很多的答案,但我的問題沒有解決,睡一晚上後,被我誤打誤撞地解決了,獻給遇到同樣問題的朋友。 方法一(eclipse): 網上大神的回答: 自己寫的程式是不建議用com.sun這個玩意兒的。。這東西屬於“Deprecated and restricted API”。。 而且各種com.sun ...
  • comparable 介面 Comparable<T> 類型參數:T - 可以與此對象進行比較的那些對象的類型 此介面強行對實現它的每個類的對象進行整體排序。這種排序被稱為類的自然排序,類的 compareTo 方法被稱為它的自然比較方法。 實現此介面的對象列表(和數組)可以通過 Collectio ...
  • 原文鏈接:https://www.cnblogs.com/shanheyongmu/p/5863961.html 1. 什麼是逆向工程 mybatis的一個主要的特點就是需要程式員自己編寫sql,那麼如果表太多的話,難免會很麻煩,所以mybatis官方提供了一個逆向工程,可以針對單表自動生成myba ...
  • 一、現象:in a frame because it set 'X-Frame-Options' to 'deny'. 二、服務配置 因為,有時候為了防止網頁被別人的網站iframe,我們可以通過在服務端設置HTTP頭部中的X-Frame-Options信息。 X-Frame-Options 響應頭 ...
  • Python操作MySQL主要使用兩種方式: 原生模塊 pymsql ORM框架 SQLAchemy pymql pymsql是Python中操作MySQL的模塊,在windows中的安裝: 入門:我們連接虛擬機中的centos中的mysql,然後查詢test資料庫中student表的數據 運行結果 ...
  • 第四章 模板 1.標簽 (1)if/else {% if %} 標簽檢查(evaluate)一個變數,如果這個變數為真(即,變數存在,非空,不是布爾值假),系統會顯示在 {% if %} 和 {% endif %} 之間的任何內容,例如: {% else %} 標簽是可選的: {% if %} 標簽 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...