Java SpringBoot集成RabbitMq實戰和總結

来源:https://www.cnblogs.com/chenfangzhi/archive/2018/09/26/9710698.html
-Advertisement-
Play Games

[TOC] 在公司里一直在用RabbitMQ,由於api已經封裝的很簡單,關於RabbitMQ本身還有封裝的實現沒有瞭解,最近在看RabbitMQ實戰這本書,結合網上的一些例子和spring文檔,實現了RabbitMQ和spring的集成,對著自己平時的疑惑做了一些總結。 關於RabbitMQ基礎不 ...


目錄

在公司里一直在用RabbitMQ,由於api已經封裝的很簡單,關於RabbitMQ本身還有封裝的實現沒有瞭解,最近在看RabbitMQ實戰這本書,結合網上的一些例子和spring文檔,實現了RabbitMQ和spring的集成,對著自己平時的疑惑做了一些總結。
關於RabbitMQ基礎不在詳細講解(本文不適合RabbitMq零基礎),RabbitMQ實戰的1,2,4三章講的非常不錯。因為書中講的都是Python和Php的例子,所以自己結合SpringBoot文檔和朱小廝的博客做了一些總結,寫了一些Springboot的例子

交換器、隊列、綁定的聲明

SpringAMQP項目對RabbitMQ做了很好的封裝,可以很方便的手動聲明隊列,交換器,綁定。如下:

    /**
     * 隊列
     * @return
     */
    @Bean
    @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)
    Queue queue() {
        return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true);
    }

    /**
     * 交換器
     * @return
     */
    @Bean
    @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)
    TopicExchange exchange() {
        return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true);
    }
    /**
     * 聲明綁定關係
     * @return
     */
    @Bean
    Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange,
                    @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY);
    }

    /**
     * 聲明簡單的消費者,接收到的都是原始的{@link Message}
     *
     * @param connectionFactory
     *
     * @return
     */
    @Bean
    SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageListener(message -> log.info("simple receiver,message:{}", message));
        container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE);
        return container;
    }

消費者和生產者都可以聲明,交換器這種一般經常創建,可以手動創建。需要註意對於沒有路由到隊列的消息會被丟棄。

如果是Spring的話還需要聲明連接:

    @Bean
    ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port,
                                        @Value("${spring.rabbitmq.host}") String host,
                                        @Value("${spring.rabbitmq.username}") String userName,
                                        @Value("${spring.rabbitmq.password}") String password,
                                        @Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm,
                                        @Value("${spring.rabbitmq.virtual-host}") String vhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(isConfirm);
    }

在配置類使用@EnableRabbit的情況下,也可以基於註解進行聲明,在Bean的方法上加上@RabbitListener,如下:

    /**
     * 可以直接通過註解聲明交換器、綁定、隊列。但是如果聲明的和rabbitMq中已經存在的不一致的話
     * 會報錯便於測試,我這裡都是不使用持久化,沒有消費者之後自動刪除
     * {@link RabbitListener}是可以重覆的。並且聲明隊列綁定的key也可以有多個.
     *
     * @param headers
     * @param msg
     */
    @RabbitListener(
        bindings = @QueueBinding(
            exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
                durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
            value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
                autoDelete = RabbitMQConstant.true_CONSTANT),
            key = DKEY
        ),
        //手動指明消費者的監聽容器,預設Spring為自動生成一個SimpleMessageListenerContainer
        containerFactory = "container",
        //指定消費者的線程數量,一個線程會打開一個Channel,一個隊列上的消息只會被消費一次(不考慮消息重新入隊列的情況),下麵的表示至少開啟5個線程,最多10個。線程的數目需要根據你的任務來決定,如果是計算密集型,線程的數目就應該少一些
        concurrency = "5-10"
    )
    public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
        log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
    }

    /**
     * {@link Queue#ignoreDeclarationExceptions}聲明隊列會忽略錯誤不聲明隊列,這個消費者仍然是可用的
     *
     * @param headers
     * @param msg
     */
    @RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT))
    public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
        log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
    }

關於消息序列化

這個比較簡單,預設採用了Java序列化,我們一般使用的Json格式,所以配置了Jackson,根據自己的情況來,直接貼代碼:

    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

同一個隊列多消費類型

如果是同一個隊列多個消費類型那麼就需要針對每種類型提供一個消費方法,否則找不到匹配的方法會報錯,如下:

@Component
@Slf4j
@RabbitListener(
    bindings = @QueueBinding(
        exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC,
            durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
        value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
            autoDelete = RabbitMQConstant.true_CONSTANT),
        key = RabbitMQConstant.MULTIPART_HANDLE_KEY
    )
)
@Profile(SpringConstant.MULTIPART_PROFILE)
public class MultipartConsumer {

    /**
     * RabbitHandler用於有多個方法時但是參數類型不能一樣,否則會報錯
     *
     * @param msg
     */
    @RabbitHandler
    public void process(ExampleEvent msg) {
        log.info("param:{msg = [" + msg + "]} info:");
    }

    @RabbitHandler
    public void processMessage2(ExampleEvent2 msg) {
        log.info("param:{msg2 = [" + msg + "]} info:");
    }

    /**
     * 下麵的多個消費者,消費的類型不一樣沒事,不會被調用,但是如果缺了相應消息的處理Handler則會報錯
     *
     * @param msg
     */
    @RabbitHandler
    public void processMessage3(ExampleEvent3 msg) {
        log.info("param:{msg3 = [" + msg + "]} info:");
    }


}

註解將消息和消息頭註入消費者方法

在上面也看到了@Payload等註解用於註入消息。這些註解有:

  • @Header 註入消息頭的單個屬性
  • @Payload 註入消息體到一個JavaBean中
  • @Headers 註入所有消息頭到一個Map中

這裡有一點主要註意,如果是com.rabbitmq.client.Channel,org.springframework.amqp.core.Messageorg.springframework.messaging.Message這些類型,可以不加註解,直接可以註入。
如果不是這些類型,那麼不加註解的參數將會被當做消息體。不能多於一個消息體。如下方法ExampleEvent就是預設的消息體:

public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);

關於消費者確認

RabbitMq消費者可以選擇手動和自動確認兩種模式,如果是自動,消息已到達隊列,RabbitMq對無腦的將消息拋給消費者,一旦發送成功,他會認為消費者已經成功接收,在RabbitMq內部就把消息給刪除了。另外一種就是手動模式,手動模式需要消費者對每條消息進行確認(也可以批量確認),RabbitMq發送完消息之後,會進入到一個待確認(unacked)的隊列,如下圖紅框部分:

如果消費者發送了ack,RabbitMq將會把這條消息從待確認中刪除。如果是nack並且指明不要重新入隊列,那麼該消息也會刪除。但是如果是nack且指明瞭重新入隊列那麼這條消息將會入隊列,然後重新發送給消費者,被重新投遞的消息消息頭amqp_redelivered屬性會被設置成true,客戶端可以依靠這點來判斷消息是否被確認,可以好好利用這一點,如果每次都重新回隊列會導致同一消息不停的被髮送和拒絕。消費者在確認消息之前和RabbitMq失去了連接那麼消息也會被重新投遞。所以手動確認模式很大程度上提高可靠性。自動模式的消息可以提高吞吐量。

spring手動確認消息需要將SimpleRabbitListenerContainerFactory設置為手動模式:

        simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

手動確認的消費者代碼如下:

@SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC,
            durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
        value = @Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
            autoDelete = RabbitMQConstant.true_CONSTANT),
        key = RabbitMQConstant.CONFIRM_KEY),
        containerFactory = "containerWithConfirm")
    public void process(ExampleEvent msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
                        @Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) {
        try {
            log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("consume confirm error!", e);
            //這一步千萬不要忘記,不會會導致消息未確認,消息到達連接的qos之後便不能再接收新消息
            //一般重試肯定的有次數,這裡簡單的根據是否已經重發過來來決定重發。第二個參數表示是否重新分發
            channel.basicReject(deliveryTag, !redelivered);
            //這個方法我知道的是比上面多一個批量確認的參數
            // channel.basicNack(deliveryTag, false,!redelivered);
        }
    }

關於spring的AcknowledgeMode需要說明,他一共有三種模式:NONE,MANUAL,AUTO,預設是AUTO模式。這比RabbitMq原生多了一種。這一點很容易混淆,這裡的NONE對應其實就是RabbitMq的自動確認,MANUAL是手動。而AUTO其實也是手動模式,只不過是Spring的一層封裝,他根據你方法執行的結果自動幫你發送ack和nack。如果方法未拋出異常,則發送ack。如果方法拋出異常,並且不是AmqpRejectAndDontRequeueException則發送nack,並且重新入隊列。如果拋出異常時AmqpRejectAndDontRequeueException則發送nack不會重新入隊列。我有一個例子專門測試NONE,見CunsumerWithNoneTest

還有一點需要註意的是消費者有一個參數prefetch,它表示的是一個Channel(也就是SimpleMessageListenerContainer的一個線程)預取的消息數量,這個參數只會在手動確認的消費者才生效。可以客戶端利用這個參數來提高性能和做流量控制。如果prefetch設置的是10,當這個Channel上unacked的消息數量到達10條時,RabbitMq便不會在向你發送消息,客戶端如果處理的慢,便可以延遲確認在方法消息的接收。至於提高性能就非常容易理解,因為這個是批量獲取消息,如果客戶端處理的很快便不用一個一個去等著去新的消息。SpringAMQP2.0開始預設是250,這個參數應該已經足夠了。註意之前的版本預設值是1所以有必要重新設置一下值。當然這個值也不能設置的太大,RabbitMq是通過round robin這個策略來做負載均衡的,如果設置的太大會導致消息不多時一下子積壓到一臺消費者,不能很好的均衡負載。另外如果消息數據量很大也應該適當減小這個值,這個值過大會導致客戶端記憶體占用問題。如果你用到了事務的話也需要考慮這個值的影響,因為事務的用處不大,所以我也沒做過多的深究。

關於發送者確認模式

考慮這樣一個場景:你發送了一個消息給RabbitMq,RabbitMq接收了但是存入磁碟之前伺服器就掛了,消息也就丟了。為了保證消息的投遞有兩種解決方案,最保險的就是事務(和DB的事務沒有太大的可比性), 但是因為事務會極大的降低性能,會導致生產者和RabbitMq之間產生同步(等待確認),這也違背了我們使用RabbitMq的初衷。所以一般很少採用,這就引入第二種方案:發送者確認模式。

發送者確認模式是指發送方發送的消息都帶有一個id,RabbitMq會將消息持久化到磁碟之後通知生產者消息已經成功投遞,如果因為RabbitMq內部的錯誤會發送ack。註意這裡的發送者和RabbitMq之間是非同步的,所以相較於事務機制性能大大提高。其實很多操作都是不能保證絕對的百分之一百的成功,哪怕採用了事務也是如此,可靠性和性能很多時候需要做一些取捨,想很多互聯網公司吹噓的5個9,6個9也是一樣的道理。如果不是重要的消息性能計數器,完全可以不採用發送者確認模式。

這裡有一點我當時糾結了很久,我一直以為發送者確認模式的回調是客戶端的ack觸發的,這裡是大大的誤解!發送者確認模式和消費者沒有一點關係,消費者確認也和發送者沒有一點關係,兩者都是在和RabbitMq打交道,發送者不會管消費者有沒有收到,只要消息到了RabbitMq並且已經持久化便會通知生產者,這個ack是RabbitMq本身發出的,和消費者無關

發送者確認模式需要將Channel設置成Confirm模式,這樣才會收到通知。Spring中需要將連接設置成Confirm模式:

connectionFactory.setPublisherConfirms(isConfirm);

然後在RabbitTemplate中設置確認的回調,correlationData是消息的id,如下(只是簡單列印下):

    // 設置RabbitTemplate每次發送消息都會回調這個方法
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)
            -> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));

發送時需要給出唯一的標識(CorrelationData):

    rabbitTemplateWithConfirm.convertAndSend(RabbitMQConstant.DEFAULT_EXCHANGE, RabbitMQConstant.DEFAULT_KEY,
                new ExampleEvent(i, "confirm message id:" + i),
                new CorrelationData(Integer.toString(i)));

還有一個參數需要說下:mandatory。這個參數為true表示如果發送消息到了RabbitMq,沒有對應該消息的隊列。那麼會將消息返回給生產者,此時仍然會發送ack確認消息。

設置RabbitTemplate的回調如下:

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)
            -> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));

另外如果是RabbitMq內部的錯誤,不會調用該方法。所以如果消息特別重要,對於未確認的消息,生產者應該在記憶體用保存著,在確認時候根據返回的id刪除該消息。如果是nack可以將該消息記錄專門的日誌或者轉發到相應處理的邏輯進行後續補償。RabbitTemplate也可以配置RetryTemplate,發送失敗時直接進行重試,具體還是要結合業務。

最後關於發送者確認需要提的是spring,因為spring預設的Bean是單例的,所以針對不同的確認方案(其實有不同的確認方案是比較合理的,很多消息不需要確認,有些需要確認)需要配置不同的bean.

消費消息、死信隊列和RetryTemplate

上面也提到瞭如果消費者拋出異常時預設的處理邏輯。另外我們還可以給消費者配置RetryTemplate,如果是採用SpringBoot的話,可以在application.yml配置中配置如下:

spring:
  rabbitmq:
    listener:
       retry:
    #    重試次數
          max-attempts: 3
        #   開啟重試機制
          enabled: true

如上,如果消費者失敗的話會進行重試,預設是3次。註意這裡的重試機制RabbitMq是為感知的!到達3次之後會拋出異常調用MessageRecoverer。預設的實現為RejectAndDontRequeueRecoverer,也就是列印異常,發送nack,不會重新入隊列。
我想既然配置了重試機制消息肯定是很重要的,消息肯定不能丟,僅僅是日誌可能會因為日誌滾動丟失而且信息不明顯,所以我們要講消息保存下來。可以有如下這些方案:

  1. 使用RepublishMessageRecoverer這個MessageRecoverer會發送發送消息到指定隊列
  2. 給隊列綁定死信隊列,因為預設的RepublishMessageRecoverer會發送nack並且requeue為false。這樣拋出一場是這種方式和上面的結果一樣都是轉發到了另外一個隊列。詳見DeadLetterConsumer
  3. 註冊自己實現的MessageRecoverer
  4. 給MessageListenerContainer設置RecoveryCallback
  5. 對於方法手動捕獲異常,進行處理

我比較推薦前兩種。這裡說下死信隊列,死信隊列其實就是普通的隊列,只不過一個隊列聲明的時候指定的屬性,會將死信轉發到該交換器中。聲明死信隊列方法如下:

    @RabbitListener(
        bindings = @QueueBinding(
            exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
                durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
            value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
                autoDelete = RabbitMQConstant.true_CONSTANT, arguments = {
                @Argument(name = RabbitMQConstant.DEAD_LETTER_EXCHANGE, value = RabbitMQConstant.DEAD_EXCHANGE),
                @Argument(name = RabbitMQConstant.DEAD_LETTER_KEY, value = RabbitMQConstant.DEAD_KEY)
            }),
            key = RabbitMQConstant.DEFAULT_KEY
        ))

其實也就只是在聲明的時候多加了兩個參數x-dead-letter-exchange和x-dead-letter-routing-key。這裡一開始踩了一個坑,因為@QueueBinding註解中也有arguments屬性,我一開始將參數聲明到@QueueBinding中,導致一直沒綁定成功。如果綁定成功可以在控制台看到queue的Featrues有DLX(死信隊列交換器)和DLK(死信隊列綁定)。如下:

關於消息進入死信的規則

  1. 消息被拒絕(basic.reject/basic.nack)並且requeue=false
  2. 消息TTL過期
  3. 隊列達到最大長度

我們用到的就是第一種。

RPC模式的消息(不常用)

本來生產者和消費者是沒有耦合的,但是可以通過一些屬性產生耦合。在早期版本中,如果一個生產者想要收到消費者的回覆,實現方案是生產者在消息頭中加入reply-to屬性也就是隊列(一般是私有,排他,用完即銷毀)的名字,然後在這個隊列上進行監聽,消費者將回覆發送到這個隊列中。RabbitMq3.3之後有了改進,就是不用沒有都去創建一個臨時隊列,這樣很耗費性能,可以採用drect-to模式,省去了每次創建隊列的性能損耗,但是還是要創建一次隊列。現在Spring預設的就是這個模式。RabbitTemplate中有一系列的sendAndReceiveXX方法。預設等待5秒,超時返回null。用
法和不帶返回的差不多。

消費者的方法通過返回值直接返回消息(下麵的方法是有返回值的):

  public String receive(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
        log.info("reply to consumer param:{headers = [" + headers + "], msg = [" + msg + "]} info:");
        return REPLY;
  }

這裡的提一下最後一個註解@SendTo,用在消費方法上,指明返回值的目的地,預設不用的話就是返回給發送者,可以通過這個註解改變這種行為。如下代碼:

 @RabbitListener(
        bindings = @QueueBinding(
            exchange = @Exchange(value = RabbitMQConstant.REPLY_EXCHANGE, type = ExchangeTypes.TOPIC,
                durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
            value = @Queue(value = RabbitMQConstant.REPLY_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
                autoDelete = RabbitMQConstant.true_CONSTANT),
            key = RabbitMQConstant.REPLY_KEY
        )
    )
    @SendTo("queue.reply.s")
    public ExampleEvent log(ExampleEvent event) {
        log.info("log receive message:O{}", event);
        return new ExampleEvent(1, "log result");
    }

上面的代碼就是會將消息直接發送到預設交換器,並且以queue.reply.s作為路由鍵。@SendTo的格式為exchange/routingKey用法如下:

  1. foo/bar: 指定的交換器和key
  2. foo/: 指定的交換器,key為空
  3. bar或者/bar: 到空交換器
  4. /或者空:空的交換器和空的key

這裡還需要提一下,因為預設所有的隊列都會綁定到空交換器,並且以隊列名字作為Routekey, 所以SendTo裡面可以直接填寫隊列名字機會發送到相應的隊列.如日誌隊列。因為RPC模式不常用,專業的東西做專業的事,就像我們一般不用Redis來做消息隊列一樣(雖然他也可以實現),一般公司都有特定的技術棧,肯定有更合適的RPC通信框架。當然如果要跨語言的集成這個方案也是一種不錯的方案,可以繼續考慮採用非同步發送AsyncRabbitTemplate來降低延遲等優化方案!

關於消費模型

RabbitMQ底層的消費模型有兩種Push和Pull。我在網上查閱資料的時候發現有很多教程採用了pull這種模式。RabbitMq實戰和
RabbitMQ之Consumer消費模式(Push & Pull)都指出這種模式性能低,會影響消息的吞吐量,增加不必要的IO,所以除非有特殊的業務需求,不要採用這種方案。Spring的封裝就是採用了push的方案。

關於RabbitMq客戶端的線程模型

這裡講的是消費者的,生產者沒什麼好講的。先看消息流轉圖:

圖中橢圓表示線程,矩形是隊列。消息到達AMQP的連接線程,然後分發到client線程池,隨後分發到監聽器。註意除了監聽器的線程,其他都是在com.rabbitmq.client.impl.AMQConnection中創建的線程,我們對線程池做一些修改。連接線程名字不能修改就是AMQP Connection打頭。心跳線程可以設置setConnectionThreadFactory來設置名字。如下:

    connectionFactory.setConnectionThreadFactory(new ThreadFactory() {
            public final AtomicInteger id = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, MessageFormat.format("amqp-heart-{0}", id.getAndIncrement()));
            }
        });

client線程池見:com.rabbitmq.client.impl.ConsumerWorkService構造方法。Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)。

   final ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() {
            public final AtomicInteger id = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, MessageFormat.format("amqp-client-{0}", id.getAndIncrement()));
            }
        });

listener的線程設置如下:

        simpleRabbitListenerContainerFactory.setTaskExecutor(new SimpleAsyncTaskExecutor"amqp-consumer-"));

註意:SimpleAsyncTaskExecutor每次執行一個任務都會新建一個線程,對於生命周期很短的任務不要使用這個線程池(如client線程池的任務), 這裡的消費者線程生命周期直到SimpleMessageListenerContainer停止所以沒有適合這個場景

修改過之後的線程如下:

消息投遞過程如下:

  1. 在AMQConnection中開啟連接線程,該線程用於處理和RabbitMq的通信:
    public void startMainLoop() {
        MainLoop loop = new MainLoop();
        final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
        mainLoopThread = Environment.newThread(threadFactory, loop, name);
        mainLoopThread.start();
    }
  1. AMQConnection.heartbeatExecutor是心跳線程。
  2. AMQConnection.consumerWorkServiceExecutor則是用來處理事件的線程池,AMQConnection線程收到消息投遞到這裡。
    分發邏輯詳見com.rabbitmq.client.impl.ChannelN#processAsync->com.rabbitmq.client.impl.ConsumerDispatcher#handleDelivery->投遞到線程池.
  3. 線程池中繼續將消息投遞到org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#queue中
  4. consumer線程進行最終消息

上面的是預設的消費者監聽器。SpringAMQP 2.0引入了一個新的監聽器實現DirectMessageListenerContainer。這個實現最大的變化在於消費者的處理邏輯不是在自己的線程池中執行而是直接在client線程池中處理,這樣最明顯的是省去了線程的上下文切換的開銷,而且設計上也變得更為直觀。所以如果採用這個監聽器需要覆蓋預設的線程池加大Connection的線程池。採用這個監聽器只需要設置@RabbitListener的containerFactory屬性。聲明方法如下:

    @Bean
    DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        final DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory();
        directRabbitListenerContainerFactory.setConsumersPerQueue(Runtime.getRuntime().availableProcessors());
        directRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
        directRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        directRabbitListenerContainerFactory.setConsumersPerQueue(10);
        return directRabbitListenerContainerFactory;
    }

這時的消息流轉圖如下:

還有一些關於監聽器的例子和Springboot配置我放在了源碼里,這裡不再講述。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 美國時間 09 月 25 日,Oralce 正式發佈了 Java 11,這是據 Java 8 以後支持的首個長期版本。 為什麼說是長期版本,看下麵的官方發佈的支持路線圖表。 可以看出 Java 8 擴展支持到 2025 年,而 Java 11 擴展支持到 2026 年。 現在大部分都在用 Java ...
  • 程式員應該將核心關註點放在業務上,而不應該將時間過多的浪費在CRUD中,多數的ORM框架都把增加、修改與刪除做得非常不錯了,然後資料庫中查詢無疑是使用頻次最高、複雜度大、與性能密切相關的操作,我們希望得到一種使用方便,查詢靈活的ORM框架,MyBatis可以滿足這些要求,MyBatis是一個支持普通 ...
  • Java當中的泛型 01 這就存在一個問題,如果集合存儲元素時,而且存儲對象有很多,而且對象類型不相同,就很容易導致隱患。 在 中該文件 在編譯的時候不會出現錯誤是因為該存儲的是 的任何類型的對象,所以不會出現錯誤,編譯通過了。編譯後為 到運行。 如果要解決問題,可以把問題提前到編譯的時候去解決,讓 ...
  • 智能指針 shared_ptr 使用 上一篇 "智能指針是啥玩意" ,介紹了什麼是智能指針。 這一篇簡單說說如何使用智能指針。 一,智能指針分3類:今天只嘮嘮shared_ptr shared_ptr unique_ptr weak_ptr 二,下表是shared_ptr和unique_ptr都支持 ...
  • 異常錯誤:The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone. You must configure either the server or JDBC dri ...
  • 深淺拷貝 一、淺拷貝 列表中存儲的是數據的記憶體地址,當我們要查詢或修改列表中的數據時,我們是通過列表中的地址找到要訪問的記憶體。當我們修改列表中的數據時,如果修改的是一個不可變類型(整型,長整型,浮點數,複數,布爾,字元串,元組),會開拓一個新的記憶體空間用於存放新的數據,然後把列表中的地址修改為存放新 ...
  • No rabbit death problem ...
  • 迭代器 在Python如果一個對象可被迴圈(遍歷)該對象中每一個元素的過程叫做迭代。例如 ,字典、字元串、列表、元祖、集合等。他們可被迭代的原因是,都有一個共同的內置函數__iter__。通過執行內置對象的__next__函數,可以依次列印該對象的所有元素。例如 有一個列表,該列表存儲了1-100的 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...