有以下3種方式發送RocketMQ消息 可靠同步發送 reliable synchronous 可靠非同步發送 reliable asynchronous 單向發送 one-way transmission 可靠同步發送 主要運用在比較重要一點消息傳遞/通知等業務 可靠非同步發送 通常用於對發送消息響應 ...
有以下3種方式發送RocketMQ消息
- 可靠同步發送 reliable synchronous
- 可靠非同步發送 reliable asynchronous
- 單向發送 one-way transmission
可靠同步發送
主要運用在比較重要一點消息傳遞/通知等業務
public class SyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("test"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
可靠非同步發送
通常用於對發送消息響應時間要求更高/更快的場景
public class AsyncProducer { public static void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 10000000; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //重點在這裡 非同步發送回調 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
單向發送
適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。
只發送消息,不等待伺服器響應,只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。
public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("Test"); producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }