RocketMQ消費端 今天要來跟大家學習怎樣使用RocketMQ來進行消息的消費 先簡單創建個Maven項目使用 添加依賴 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifa ...
RocketMQ消費端
今天要來跟大家學習怎樣使用RocketMQ來進行消息的消費
先簡單創建個Maven項目使用
- 添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
-
啟動消費者
package mq.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class BroadcastConsumer { public static void main(String[] args) throws MQClientException { //創建一個push模式的消費組 DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumer"); pushConsumer.setNamesrvAddr("localhost:9876"); //集群模式 pushConsumer.setMessageModel(MessageModel.CLUSTERING); // 訂閱的topic tag pushConsumer.subscribe("topic_test01","Tag1 || Tag2"); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); pushConsumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }
-
啟動生產者
package mq.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class SyncProducerV2 { /** * 同步消息發送 * * @param args * @throws MQClientException * @throws MQBrokerException * @throws RemotingException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException, UnsupportedEncodingException { System.out.println("SyncProducer start......"); DefaultMQProducer defaultMQProducer = new DefaultMQProducer("pg_sync_01"); defaultMQProducer.setNamesrvAddr("localhost:9876"); defaultMQProducer.start(); for (int i = 0; i < 10; i++) { send(defaultMQProducer, i, i % 3); } defaultMQProducer.shutdown(); System.out.println("SyncProducer end......"); } private static void send(DefaultMQProducer defaultMQProducer, Integer i, int tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { SendResult sendResult = defaultMQProducer.send(new Message("topic_test01", "Tag" + tag, ("hello this is sync message_" + i + "!").getBytes(RemotingHelper.DEFAULT_CHARSET))); System.out.println(sendResult); } }
-
消費者消費
可以看到消費了Tag為Tag1、Tag2的消息
其它Tag會被過濾掉
消費分類
RocketMQ的消費模式分為兩種:BROADCASTING(廣播)和CLUSTERING(集群)
那這兩種模式有什麼區別呢?
- 廣播:相同消費組下的實例會重覆消費同一個Topic的消息,可以理解為大家做同樣的工作,消費進度存儲在客戶端,有可能會導致部分消息沒有被消費
- 集群:相同消費組下的實例會負載均衡地消費同一個Topic的消息,可以理解為分工合作,消費進度存儲在Broker端
所以大部分系統都會使用集群模式去消費信息,畢竟可以水平拓展消費者來承受更大的消費壓力
廣播模式相對來說使用比較少,一般都是一些消息通知同步的場景,想同步刷新緩存等
本文由博客一文多發平臺 OpenWrite 發佈!