導論 記錄一下阿裡雲消息服務與Spring的整合,以及ProducerId與ConsumerId的管理,其他的消息服務也是類似(RocketMQ、Kafka),阿裡雲消息服務性能還是很可觀的,雖然收費,單也推薦使用。 整合 消息服務的概念就不想多說了,需要的可以去看官方文檔, "參考文檔" 。 創建 ...
導論
記錄一下阿裡雲消息服務與Spring的整合,以及ProducerId與ConsumerId的管理,其他的消息服務也是類似(RocketMQ、Kafka),阿裡雲消息服務性能還是很可觀的,雖然收費,單也推薦使用。
整合
消息服務的概念就不想多說了,需要的可以去看官方文檔,參考文檔。
創建topic
首先創建topic,如下圖填好信息就OK了。
創建成功之後是這樣
ProducerId的創建
ConsumerId的創建
整合spring
上面那些步驟信息填完整之後topic、ProducerId、ConsumerId都創建好了就可以使用消息隊列了
Producer的整合
<bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start"
destroy-method="shutdown">
<property name="properties"> <!--生產者配置信息-->
<props>
<!-- 生成者ID,需要提前在阿裡雲創建 -->
<prop key="ProducerId">PID-SIT-TransitHub-NotifyUnbind</prop> <!--請替換為自己的賬戶信息-->
<!-- AccessKey、SecretKey由阿裡雲分配 -->
<prop key="AccessKey">LTAIqfzogBNFeohh11</prop>
<prop key="SecretKey">zoahuhZKscEk5Q8Qtr</prop>
<!-- 根據自己伺服器選擇不同的tcp接入url,此處選擇公網 -->
<prop key="ONSAddr">http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet</prop>
</props>
</property>
</bean>
Consumer的整合
<!-- 創建Listener將消費者處於阻塞狀態,只要有自己topic訂閱的消息發佈消息馬上就會訂閱到-->
<bean id="tsmDeleteAidMsgListener" class="com.snowball.hub.msg.DataMessageListener" /> Listener配置
<bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean"
init-method="start" destroy-method="shutdown">
<property name="properties">
<props>
<prop key="ConsumerId">CID-SIT-OPS-NotifyUnbind</prop>
<prop key="AccessKey">${access_key}</prop>
<prop key="SecretKey">${secret_key}</prop>
<!--將消費者線程數固定為50個,該線程不會和主業務線程耦合-->
<prop key="ConsumeThreadNums">50</prop>
</props>
</property>
<property name="subscriptionTable">
<map>
<entry value-ref="tsmDeleteAidMsgListener">
<key>
<bean class="com.aliyun.openservices.ons.api.bean.Subscription">
<!-- 此處填將之前創建的topic -->
<property name="topic" value="snb-test-topic4" />
<property name="expression" value="*" />
<!--
expression即Tag,可以設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅代表訂閱所有Tag,不支持通配
-->
</bean>
</key>
</entry>
更多的訂閱添加entry節點即可
</map>
</property>
</bean>
Consumer的整合和Producer基本一致,不同的是需要創建一個Listener,作用已經在註釋中說明。
使用阿裡雲sdk發佈和訂閱消息
上面只是整合了普通消息,阿裡雲MQ消息分很四種,每一種的整合API都不一樣,具體整合細節可以參考文章開始出的參考文檔。
發佈消息
public class ProducerTest {
//如果和spring整合了,那就直接註入就好了,本次使用傳統的發佈方式
//@Autowired
//private Producer producer;
//topic的管理最好做成可配置,可以對應不同的環境管理不同的topic,本次還是使用傳統的//方式發佈
//@Value("#{configProperties['send_unbind_topic']}")
//private String send_unbind_topic;
public static void main(String[] args) {
Properties properties = new Properties();
// 您在MQ控制台創建的Producer ID
properties.put(PropertyKeyConst.ProducerId, "XXX");
// 鑒權用AccessKey,在阿裡雲伺服器管理控制台創建
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 鑒權用SecretKey,在阿裡雲伺服器管理控制台創建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設置 TCP 接入功能變數名稱(此處以公共雲的公網接入為例)
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可
producer.start();
//迴圈發送消息
while(true){
Message msg = new Message( //
// 在控制台創建的Topic,即該消息所屬的Topic名稱
"TopicTestMQ",
// Message Tag,
// 可理解為Gmail中的標簽,對消息進行再歸類,方便Consumer指定過濾條件在MQ伺服器過濾
"TagA",
// Message Body
// 任何二進位形式的數據, MQ不做任何干預,
// 需要Producer與Consumer協商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請儘可能全局唯一,以方便您在無法正常收到消息情況下,可通過MQ控制台查詢消息並補發
// 註意:不設置也不會影響消息正常收發
msg.setKey("ORDERID_100");
// 發送消息,只要不拋異常就是成功
// 列印Message ID,以便用於消息發送狀態查詢
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
}
// 在應用退出前,可以銷毀Producer對象
// 註意:如果不銷毀也沒有問題
producer.shutdown();
}
}
消息發佈成功可以看到sendResult是這樣的信息
{"messageId":"0200010546D011E87BD078ACF4180003","topic":"TPC-SIT-COM-TransitHub-NotifyUnbind"}
根據messageId可以定位這條消息的軌跡,可以很清晰的定位消息的消費軌跡。
訂閱消息
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在MQ控制台創建的Consumer ID
properties.put(PropertyKeyConst.ConsumerId, "XXX");
// 鑒權用AccessKey,在阿裡雲伺服器管理控制台創建
properties.put(PropertyKeyConst.AccessKey, "XXX");
// 鑒權用SecretKey,在阿裡雲伺服器管理控制台創建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設置 TCP 接入功能變數名稱(此處以公共雲公網環境接入為例)
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Consumer consumer = ONSFactory.createConsumer(properties);
//這個Listener如果之前已經在spring容器中註冊過直接使用就好了,這裡就不演示了
consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
消息的發佈與訂閱就這麼多,要使用消息服務總結起來就四步。
- 開通服務
- 申請資源
- 發佈消息
- 訂閱消息
總結
消息的產品很多,阿裡雲的消息服務是目前互聯網公司使用占比很大的,本次只是很簡單介紹消息服務的使用,具體實現細節筆者也在學習中。