[TOC] 在公司里一直在用RabbitMQ,由於api已經封裝的很簡單,關於RabbitMQ本身還有封裝的實現沒有瞭解,最近在看RabbitMQ實戰這本書,結合網上的一些例子和spring文檔,實現了RabbitMQ和spring的集成,對著自己平時的疑惑做了一些總結。 關於RabbitMQ基礎不 ...
目錄
- 交換器、隊列、綁定的聲明
- 關於消息序列化
- 同一個隊列多消費類型
- 註解將消息和消息頭註入消費者方法
- 關於消費者確認
- 關於發送者確認模式
- 消費消息、死信隊列和RetryTemplate
- RPC模式的消息(不常用)
- 關於消費模型
- 關於RabbitMq客戶端的線程模型
在公司里一直在用RabbitMQ,由於api已經封裝的很簡單,關於RabbitMQ本身還有封裝的實現沒有瞭解,最近在看RabbitMQ實戰這本書,結合網上的一些例子和spring文檔,實現了RabbitMQ和spring的集成,對著自己平時的疑惑做了一些總結。
關於RabbitMQ基礎不在詳細講解(本文不適合RabbitMq零基礎),RabbitMQ實戰的1,2,4三章講的非常不錯。因為書中講的都是Python和Php的例子,所以自己結合SpringBoot文檔和朱小廝的博客做了一些總結,寫了一些Springboot的例子。
交換器、隊列、綁定的聲明
SpringAMQP項目對RabbitMQ做了很好的封裝,可以很方便的手動聲明隊列,交換器,綁定。如下:
/**
* 隊列
* @return
*/
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)
Queue queue() {
return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true);
}
/**
* 交換器
* @return
*/
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)
TopicExchange exchange() {
return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true);
}
/**
* 聲明綁定關係
* @return
*/
@Bean
Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange,
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY);
}
/**
* 聲明簡單的消費者,接收到的都是原始的{@link Message}
*
* @param connectionFactory
*
* @return
*/
@Bean
SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(message -> log.info("simple receiver,message:{}", message));
container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE);
return container;
}
消費者和生產者都可以聲明,交換器這種一般經常創建,可以手動創建。需要註意對於沒有路由到隊列的消息會被丟棄。
如果是Spring的話還需要聲明連接:
@Bean
ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.username}") String userName,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm,
@Value("${spring.rabbitmq.virtual-host}") String vhost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(isConfirm);
}
在配置類使用@EnableRabbit
的情況下,也可以基於註解進行聲明,在Bean的方法上加上@RabbitListener
,如下:
/**
* 可以直接通過註解聲明交換器、綁定、隊列。但是如果聲明的和rabbitMq中已經存在的不一致的話
* 會報錯便於測試,我這裡都是不使用持久化,沒有消費者之後自動刪除
* {@link RabbitListener}是可以重覆的。並且聲明隊列綁定的key也可以有多個.
*
* @param headers
* @param msg
*/
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = DKEY
),
//手動指明消費者的監聽容器,預設Spring為自動生成一個SimpleMessageListenerContainer
containerFactory = "container",
//指定消費者的線程數量,一個線程會打開一個Channel,一個隊列上的消息只會被消費一次(不考慮消息重新入隊列的情況),下麵的表示至少開啟5個線程,最多10個。線程的數目需要根據你的任務來決定,如果是計算密集型,線程的數目就應該少一些
concurrency = "5-10"
)
public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}
/**
* {@link Queue#ignoreDeclarationExceptions}聲明隊列會忽略錯誤不聲明隊列,這個消費者仍然是可用的
*
* @param headers
* @param msg
*/
@RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT))
public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}
關於消息序列化
這個比較簡單,預設採用了Java序列化,我們一般使用的Json格式,所以配置了Jackson,根據自己的情況來,直接貼代碼:
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
同一個隊列多消費類型
如果是同一個隊列多個消費類型那麼就需要針對每種類型提供一個消費方法,否則找不到匹配的方法會報錯,如下:
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.MULTIPART_HANDLE_KEY
)
)
@Profile(SpringConstant.MULTIPART_PROFILE)
public class MultipartConsumer {
/**
* RabbitHandler用於有多個方法時但是參數類型不能一樣,否則會報錯
*
* @param msg
*/
@RabbitHandler
public void process(ExampleEvent msg) {
log.info("param:{msg = [" + msg + "]} info:");
}
@RabbitHandler
public void processMessage2(ExampleEvent2 msg) {
log.info("param:{msg2 = [" + msg + "]} info:");
}
/**
* 下麵的多個消費者,消費的類型不一樣沒事,不會被調用,但是如果缺了相應消息的處理Handler則會報錯
*
* @param msg
*/
@RabbitHandler
public void processMessage3(ExampleEvent3 msg) {
log.info("param:{msg3 = [" + msg + "]} info:");
}
}
註解將消息和消息頭註入消費者方法
在上面也看到了@Payload
等註解用於註入消息。這些註解有:
- @Header 註入消息頭的單個屬性
- @Payload 註入消息體到一個JavaBean中
- @Headers 註入所有消息頭到一個Map中
這裡有一點主要註意,如果是com.rabbitmq.client.Channel
,org.springframework.amqp.core.Message
和org.springframework.messaging.Message
這些類型,可以不加註解,直接可以註入。
如果不是這些類型,那麼不加註解的參數將會被當做消息體。不能多於一個消息體。如下方法ExampleEvent就是預設的消息體:
public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);
關於消費者確認
RabbitMq消費者可以選擇手動和自動確認兩種模式,如果是自動,消息已到達隊列,RabbitMq對無腦的將消息拋給消費者,一旦發送成功,他會認為消費者已經成功接收,在RabbitMq內部就把消息給刪除了。另外一種就是手動模式,手動模式需要消費者對每條消息進行確認(也可以批量確認),RabbitMq發送完消息之後,會進入到一個待確認(unacked)的隊列,如下圖紅框部分:
如果消費者發送了ack,RabbitMq將會把這條消息從待確認中刪除。如果是nack並且指明不要重新入隊列,那麼該消息也會刪除。但是如果是nack且指明瞭重新入隊列那麼這條消息將會入隊列,然後重新發送給消費者,被重新投遞的消息消息頭amqp_redelivered屬性會被設置成true,客戶端可以依靠這點來判斷消息是否被確認,可以好好利用這一點,如果每次都重新回隊列會導致同一消息不停的被髮送和拒絕。消費者在確認消息之前和RabbitMq失去了連接那麼消息也會被重新投遞。所以手動確認模式很大程度上提高可靠性。自動模式的消息可以提高吞吐量。
spring手動確認消息需要將SimpleRabbitListenerContainerFactory
設置為手動模式:
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
手動確認的消費者代碼如下:
@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.CONFIRM_KEY),
containerFactory = "containerWithConfirm")
public void process(ExampleEvent msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
@Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) {
try {
log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("consume confirm error!", e);
//這一步千萬不要忘記,不會會導致消息未確認,消息到達連接的qos之後便不能再接收新消息
//一般重試肯定的有次數,這裡簡單的根據是否已經重發過來來決定重發。第二個參數表示是否重新分發
channel.basicReject(deliveryTag, !redelivered);
//這個方法我知道的是比上面多一個批量確認的參數
// channel.basicNack(deliveryTag, false,!redelivered);
}
}
關於spring的AcknowledgeMode需要說明,他一共有三種模式:NONE,MANUAL,AUTO,預設是AUTO模式。這比RabbitMq原生多了一種。這一點很容易混淆,這裡的NONE對應其實就是RabbitMq的自動確認,MANUAL是手動。而AUTO其實也是手動模式,只不過是Spring的一層封裝,他根據你方法執行的結果自動幫你發送ack和nack。如果方法未拋出異常,則發送ack。如果方法拋出異常,並且不是AmqpRejectAndDontRequeueException
則發送nack,並且重新入隊列。如果拋出異常時AmqpRejectAndDontRequeueException
則發送nack不會重新入隊列。我有一個例子專門測試NONE,見CunsumerWithNoneTest
。
還有一點需要註意的是消費者有一個參數prefetch,它表示的是一個
關於發送者確認模式
考慮這樣一個場景:你發送了一個消息給RabbitMq,RabbitMq接收了但是存入磁碟之前伺服器就掛了,消息也就丟了。為了保證消息的投遞有兩種解決方案,最保險的就是事務(和DB的事務沒有太大的可比性), 但是因為事務會極大的降低性能,會導致生產者和RabbitMq之間產生同步(等待確認),這也違背了我們使用RabbitMq的初衷。所以一般很少採用,這就引入第二種方案:發送者確認模式。
發送者確認模式是指發送方發送的消息都帶有一個id,RabbitMq會將消息持久化到磁碟之後通知生產者消息已經成功投遞,如果因為RabbitMq內部的錯誤會發送ack。註意這裡的發送者和RabbitMq之間是非同步的,所以相較於事務機制性能大大提高。其實很多操作都是不能保證絕對的百分之一百的成功,哪怕採用了事務也是如此,可靠性和性能很多時候需要做一些取捨,想很多互聯網公司吹噓的5個9,6個9也是一樣的道理。如果不是重要的消息性能計數器,完全可以不採用發送者確認模式。
這裡有一點我當時糾結了很久,我一直以為發送者確認模式的回調是客戶端的ack觸發的,這裡是大大的誤解!發送者確認模式和消費者沒有一點關係,消費者確認也和發送者沒有一點關係,兩者都是在和RabbitMq打交道,發送者不會管消費者有沒有收到,只要消息到了RabbitMq並且已經持久化便會通知生產者,這個ack是RabbitMq本身發出的,和消費者無關
發送者確認模式需要將Channel設置成Confirm模式,這樣才會收到通知。Spring中需要將連接設置成Confirm模式:
connectionFactory.setPublisherConfirms(isConfirm);
然後在RabbitTemplate中設置確認的回調,correlationData是消息的id,如下(只是簡單列印下):
// 設置RabbitTemplate每次發送消息都會回調這個方法
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)
-> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));
發送時需要給出唯一的標識(CorrelationData
):
rabbitTemplateWithConfirm.convertAndSend(RabbitMQConstant.DEFAULT_EXCHANGE, RabbitMQConstant.DEFAULT_KEY,
new ExampleEvent(i, "confirm message id:" + i),
new CorrelationData(Integer.toString(i)));
還有一個參數需要說下:mandatory。這個參數為true表示如果發送消息到了RabbitMq,沒有對應該消息的隊列。那麼會將消息返回給生產者,此時仍然會發送ack確認消息。
設置RabbitTemplate的回調如下:
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)
-> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));
另外如果是RabbitMq內部的錯誤,不會調用該方法。所以如果消息特別重要,對於未確認的消息,生產者應該在記憶體用保存著,在確認時候根據返回的id刪除該消息。如果是nack可以將該消息記錄專門的日誌或者轉發到相應處理的邏輯進行後續補償。RabbitTemplate也可以配置RetryTemplate,發送失敗時直接進行重試,具體還是要結合業務。
最後關於發送者確認需要提的是spring,因為spring預設的Bean是單例的,所以針對不同的確認方案(其實有不同的確認方案是比較合理的,很多消息不需要確認,有些需要確認)需要配置不同的bean.
消費消息、死信隊列和RetryTemplate
上面也提到瞭如果消費者拋出異常時預設的處理邏輯。另外我們還可以給消費者配置RetryTemplate,如果是採用SpringBoot的話,可以在application.yml配置中配置如下:
spring:
rabbitmq:
listener:
retry:
# 重試次數
max-attempts: 3
# 開啟重試機制
enabled: true
如上,如果消費者失敗的話會進行重試,預設是3次。註意這裡的重試機制RabbitMq是為感知的!到達3次之後會拋出異常調用MessageRecoverer
。預設的實現為RejectAndDontRequeueRecoverer,也就是列印異常,發送nack,不會重新入隊列。
我想既然配置了重試機制消息肯定是很重要的,消息肯定不能丟,僅僅是日誌可能會因為日誌滾動丟失而且信息不明顯,所以我們要講消息保存下來。可以有如下這些方案:
- 使用RepublishMessageRecoverer這個MessageRecoverer會發送發送消息到指定隊列
- 給隊列綁定死信隊列,因為預設的RepublishMessageRecoverer會發送nack並且requeue為false。這樣拋出一場是這種方式和上面的結果一樣都是轉發到了另外一個隊列。詳見DeadLetterConsumer
- 註冊自己實現的MessageRecoverer
- 給MessageListenerContainer設置RecoveryCallback
- 對於方法手動捕獲異常,進行處理
我比較推薦前兩種。這裡說下死信隊列,死信隊列其實就是普通的隊列,只不過一個隊列聲明的時候指定的屬性,會將死信轉發到該交換器中。聲明死信隊列方法如下:
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT, arguments = {
@Argument(name = RabbitMQConstant.DEAD_LETTER_EXCHANGE, value = RabbitMQConstant.DEAD_EXCHANGE),
@Argument(name = RabbitMQConstant.DEAD_LETTER_KEY, value = RabbitMQConstant.DEAD_KEY)
}),
key = RabbitMQConstant.DEFAULT_KEY
))
其實也就只是在聲明的時候多加了兩個參數x-dead-letter-exchange和x-dead-letter-routing-key。這裡一開始踩了一個坑,因為@QueueBinding
註解中也有arguments屬性,我一開始將參數聲明到@QueueBinding
中,導致一直沒綁定成功。如果綁定成功可以在控制台看到queue的Featrues有DLX(死信隊列交換器)和DLK(死信隊列綁定)。如下:
- 消息被拒絕(basic.reject/basic.nack)並且requeue=false
- 消息TTL過期
- 隊列達到最大長度
我們用到的就是第一種。
RPC模式的消息(不常用)
本來生產者和消費者是沒有耦合的,但是可以通過一些屬性產生耦合。在早期版本中,如果一個生產者想要收到消費者的回覆,實現方案是生產者在消息頭中加入reply-to屬性也就是隊列(一般是私有,排他,用完即銷毀)的名字,然後在這個隊列上進行監聽,消費者將回覆發送到這個隊列中。RabbitMq3.3之後有了改進,就是不用沒有都去創建一個臨時隊列,這樣很耗費性能,可以採用drect-to模式,省去了每次創建隊列的性能損耗,但是還是要創建一次隊列。現在Spring預設的就是這個模式。RabbitTemplate中有一系列的sendAndReceiveXX
方法。預設等待5秒,超時返回null。用
法和不帶返回的差不多。
消費者的方法通過返回值直接返回消息(下麵的方法是有返回值的):
public String receive(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("reply to consumer param:{headers = [" + headers + "], msg = [" + msg + "]} info:");
return REPLY;
}
這裡的提一下最後一個註解@SendTo
,用在消費方法上,指明返回值的目的地,預設不用的話就是返回給發送者,可以通過這個註解改變這種行為。如下代碼:
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.REPLY_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.REPLY_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.REPLY_KEY
)
)
@SendTo("queue.reply.s")
public ExampleEvent log(ExampleEvent event) {
log.info("log receive message:O{}", event);
return new ExampleEvent(1, "log result");
}
上面的代碼就是會將消息直接發送到預設交換器,並且以queue.reply.s作為路由鍵。@SendTo的格式為exchange/routingKey用法如下:
- foo/bar: 指定的交換器和key
- foo/: 指定的交換器,key為空
- bar或者/bar: 到空交換器
- /或者空:空的交換器和空的key
這裡還需要提一下,因為預設所有的隊列都會綁定到空交換器,並且以隊列名字作為Routekey, 所以SendTo裡面可以直接填寫隊列名字機會發送到相應的隊列.如日誌隊列。因為RPC模式不常用,專業的東西做專業的事,就像我們一般不用Redis來做消息隊列一樣(雖然他也可以實現),一般公司都有特定的技術棧,肯定有更合適的RPC通信框架。當然如果要跨語言的集成這個方案也是一種不錯的方案,可以繼續考慮採用非同步發送AsyncRabbitTemplate
來降低延遲等優化方案!
關於消費模型
RabbitMQ底層的消費模型有兩種Push和Pull。我在網上查閱資料的時候發現有很多教程採用了pull這種模式。RabbitMq實戰和
RabbitMQ之Consumer消費模式(Push & Pull)都指出這種模式性能低,會影響消息的吞吐量,增加不必要的IO,所以除非有特殊的業務需求,不要採用這種方案。Spring的封裝就是採用了push的方案。
關於RabbitMq客戶端的線程模型
這裡講的是消費者的,生產者沒什麼好講的。先看消息流轉圖:
圖中橢圓表示線程,矩形是隊列。消息到達AMQP的連接線程,然後分發到client線程池,隨後分發到監聽器。註意除了監聽器的線程,其他都是在com.rabbitmq.client.impl.AMQConnection
中創建的線程,我們對線程池做一些修改。連接線程名字不能修改就是AMQP Connection打頭。心跳線程可以設置setConnectionThreadFactory來設置名字。如下:
connectionFactory.setConnectionThreadFactory(new ThreadFactory() {
public final AtomicInteger id = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, MessageFormat.format("amqp-heart-{0}", id.getAndIncrement()));
}
});
client線程池見:com.rabbitmq.client.impl.ConsumerWorkService
構造方法。Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)。
final ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() {
public final AtomicInteger id = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, MessageFormat.format("amqp-client-{0}", id.getAndIncrement()));
}
});
listener的線程設置如下:
simpleRabbitListenerContainerFactory.setTaskExecutor(new SimpleAsyncTaskExecutor"amqp-consumer-"));
註意:SimpleAsyncTaskExecutor每次執行一個任務都會新建一個線程,對於生命周期很短的任務不要使用這個線程池(如client線程池的任務), 這裡的消費者線程生命周期直到SimpleMessageListenerContainer停止所以沒有適合這個場景
修改過之後的線程如下:
消息投遞過程如下:
- 在AMQConnection中開啟連接線程,該線程用於處理和RabbitMq的通信:
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
- AMQConnection.heartbeatExecutor是心跳線程。
- AMQConnection.consumerWorkServiceExecutor則是用來處理事件的線程池,AMQConnection線程收到消息投遞到這裡。
分發邏輯詳見com.rabbitmq.client.impl.ChannelN#processAsync->com.rabbitmq.client.impl.ConsumerDispatcher#handleDelivery->投遞到線程池. - 線程池中繼續將消息投遞到org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#queue中
- consumer線程進行最終消息
上面的是預設的消費者監聽器。SpringAMQP 2.0引入了一個新的監聽器實現DirectMessageListenerContainer
。這個實現最大的變化在於消費者的處理邏輯不是在自己的線程池中執行而是直接在client線程池中處理,這樣最明顯的是省去了線程的上下文切換的開銷,而且設計上也變得更為直觀。所以如果採用這個監聽器需要覆蓋預設的線程池加大Connection的線程池。採用這個監聽器只需要設置@RabbitListener
的containerFactory屬性。聲明方法如下:
@Bean
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
final DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory();
directRabbitListenerContainerFactory.setConsumersPerQueue(Runtime.getRuntime().availableProcessors());
directRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
directRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
directRabbitListenerContainerFactory.setConsumersPerQueue(10);
return directRabbitListenerContainerFactory;
}
這時的消息流轉圖如下:
還有一些關於監聽器的例子和Springboot配置我放在了源碼里,這裡不再講述。