rocketmq 安裝及使用樣例,基本用法及消息類型,web控制台的使用 ...
一: 安裝rocketmq與啟動
虛擬機環境分配4G記憶體,linux centos7
下載連接 http://rocketmq.apache.org/dowloading/releases/
解壓後進入bin目錄,修改記憶體配置,否則啟動失敗(預設配置記憶體8G,記憶體不夠會啟動失敗)
# /home/admin/rocketmqall4.7.0/bin
vi runbroker.sh
# 設置小些
# JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn4g"
# 關閉防火牆或者自行開放埠,避免網路不通
systemctl disable firewalld
# 後臺啟動 nameserver 預設埠9876
nohup sh mqnamesrv &
# 後臺啟動 mqbroker 預設埠 10911
nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &
#踩坑記錄
#客戶端生產者使用時問題
#報錯說 no router info of this topic
#則通過手動創建 topic, 註意 參數格式 ip:port
sh mqadmin updateTopic -b localhost:10911 -t DemoTopic -n localhost:9876
#手動創建topic時 報錯 簽名演算法問題(擴展包沒找到)
#rocketMQ:unable to calculate a request signature. error=Algorithm HmacSHA1 not available
cd ~/rocketmqall4.7.0/bin
vi tools.sh
#在 ${JAVA_HOME}/jre/lib/ext 後加上ext文件夾的絕對路徑,
#如JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext"
#再次創建 topic.
#這樣就可以基本使用了
#rocketmq web控制台擴展在文末描述
二: 運行樣例測試下
引入客戶端依賴包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency>
1、Producer端發送同步消息
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,簡訊通知。
public class MySyncProducer { public static void main(String[] args) throws Exception { // 實例化消息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("my_SyncProducer"); // 設置NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動Producer實例 producer.start(); for (int i = 0; i < 10000; i++) { // 創建消息,並指定Topic,Tag和消息體 Message msg = new Message("my_TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發送消息到一個Broker SendResult sendResult = producer.send(msg); // 通過sendResult返回消息是否成功送達 System.out.printf("%s%n", sendResult); } // 如果不再發送消息,關閉Producer實例。 Thread.sleep(500000); producer.shutdown(); } }
觀察產生的消息數據 跑了2次產生2w條消息
2w條分佈在4個消息隊列中
消費者消費
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_SyncProducer"); // 設置NameServer的地址 consumer.setNamesrvAddr("192.168.1.114:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe("my_TopicTest", "*"); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
消費詳情
2、Producer端發送非同步消息
非同步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應
public class AsyncProducer { public static void main(String[] args) throws Exception { // 實例化消息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("my_AsyncProducer"); // 設置NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動Producer實例 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 1000; // 根據消息數量實例化倒計時計算器 final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; // 創建消息,並指定Topic,Tag和消息體 Message msg = new Message("my_asyncTopic", "TagA", "AUTO_CREATE_TOPIC_KEY", "Hello world2".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收非同步返回結果的回調 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // 等待5s countDownLatch.await(5, TimeUnit.SECONDS); // 如果不再發送消息,關閉Producer實例。 producer.shutdown(); } }
3、Producer端單向發送消息
這種方式主要用在不特別關心發送結果的場景,例如日誌發送。
public class OnewayProducer { public static void main(String[] args) throws Exception{ // 實例化消息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("my_OnewayProducer"); // 設置NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動Producer實例 producer.start(); for (int i = 0; i < 10000; i++) { // 創建消息,並指定Topic,Tag和消息體 Message msg = new Message("my_OnewayTopic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發送單向消息,沒有任何返回結果 producer.sendOneway(msg); } // 如果不再發送消息,關閉Producer實例。 producer.shutdown(); } }
4、 消費消息
消費指定生產組producerGroup及指定topic的消息
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_SyncProducer"); // 設置NameServer的地址 consumer.setNamesrvAddr("192.168.1.114:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe("my_TopicTest", "*"); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
5、 順序消息樣例
消息生產到一個隊列那麼FIFO,全局中都是有序的消費.如果是分片到多個隊列,每個隊列中都是有序的,分區有序.
順序消息的產生
public class OrderProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("my_OrderProducer"); producer.setNamesrvAddr("192.168.1.114:9876"); producer.start(); String[] tags = new String[]{"TagA", "TagC", "TagD"}; // 訂單列表 List<OrderStep> orderList = new OrderProducer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 10; i++) { // 加個時間首碼 String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("my_OrderProducer_Topic", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根據訂單id選擇發送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//訂單id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 訂單的步驟 */ private static class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } } /** * 生成模擬訂單數據 */ private List<OrderStep> buildOrders() { List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("創建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }OrderProducer
順序消費消息
public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_OrderProducer"); consumer.setNamesrvAddr("192.168.1.114:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("my_OrderProducer_Topic", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); int anInt = random.nextInt(); for (MessageExt msg : msgs) { // 可以看到每個queue有唯一的consume線程來消費, 訂單對每個queue(分區)有序 System.out.println(anInt+" consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模擬業務邏輯處理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }ConsumerInOrder
分區順序消費
6、延時消息樣例
先啟動消費者等待延時消息
public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_ScheduledMessageConsumer"); consumer.setNamesrvAddr("192.168.1.114:9876"); // 訂閱Topics consumer.subscribe("my_Scheduled_Topic", "*"); // 註冊消息監聽者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者 consumer.start(); } }ScheduledMessageConsumer
發送延時消息
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // 實例化一個生產者來產生延時消息 DefaultMQProducer producer = new DefaultMQProducer("my_ScheduledMessage"); producer.setNamesrvAddr("192.168.1.114:9876"); // 啟動生產者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("my_Scheduled_Topic", ("Hello scheduled message " + i).getBytes()); // 設置延時等級3,這個消息將在10s之後發送(現在只支持固定的幾個時間,詳看delayTimeLevel) message.setDelayTimeLevel(3); // 發送消息 producer.send(message); } // 關閉生產者 producer.shutdown(); } }ScheduledMessageProducer
延遲時間結束後消息才放入隊列被消費者消費,消費時間比發送時間晚,
使用場景:如電商里,提交了一個訂單就可以發送一個延時消息,1h後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。(redis中有個key失效時間,失效事件類似)
使用限制: 時間不是任意的,private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
7、消息事務樣例
事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:
- TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
- TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
- TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。
創建事務性生產者
使用 TransactionMQProducer
類創建生產者,並指定唯一的 ProducerGroup
,就可以設置自定義線程池來處理這些檢查請求。執行本地事務後、需要根據執行結果對消息隊列進行回覆。回傳的事務狀態是以上三種。
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("my_TransactionMQProducer"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); // 設置NameServer的地址 producer.setNamesrvAddr("192.168.1.114:9876"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("my_transaction_msg_topic", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }TransactionProducer
事務性消息的監聽
static class TransactionListenerImpl implements TransactionListener{ private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }TransactionListenerImpl
事務消息使用上的限制
- 事務消息不支持延時消息和批量消息。
- 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們預設將單個消息的檢查次數限製為 15 次,但是用戶可以通過 Broker 配置文件的
transactionCheckMax
參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax
) 則 Broker 將丟棄此消息,併在預設情況下同時列印錯誤日誌。用戶可以通過重寫AbstractTransactionCheckListener
類來修改這個行為。 - 事務消息將在 Broker 配置文件中的參數 transactionMsgTimeout 這樣的特定時間長度之後被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於
transactionMsgTimeout
參數。 - 事務性消息可能不止一次被檢查或消費。
- 提交給用戶的目標主題消息可能會失敗,目前這依日誌的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。
- 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共用。與其他類型的消息不同,事務消息允許反向查詢、MQ伺服器能通過它們的生產者 ID 查詢到消費者
8、批量消息樣例
9、過濾消息樣例
0、OpenMessaging樣例
官方鏈接 https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
修改配置文件 namesrvAddr地址後打包運行
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.1.jar