Spring Boot消息隊列應用實踐

来源:https://www.cnblogs.com/jeffwongishandsome/archive/2018/04/30/spring-boot-integrate-messaging-queue-practise.html
-Advertisement-
Play Games

消息隊列是大型複雜系統解耦利器。本文根據應用廣泛的消息隊列RabbitMQ,介紹Spring Boot應用程式中隊列中間件的開發和應用。 一、RabbitMQ基礎 1、RabbitMQ簡介 RabbitMQ是Spring所在公司Pivotal自己的產品,是基於AMQP高級隊列協議的消息中間件,採用e ...


消息隊列是大型複雜系統解耦利器。本文根據應用廣泛的消息隊列RabbitMQ,介紹Spring Boot應用程式中隊列中間件的開發和應用。

一、RabbitMQ基礎

1、RabbitMQ簡介

RabbitMQ是Spring所在公司Pivotal自己的產品,是基於AMQP高級隊列協議的消息中間件,採用erlang開發,所以你的RabbitMQ隊列伺服器需要erlang環境。

可以直接官方的說法:RabbitMQ is the most widely deployed open source message broker.言簡意賅,一目瞭然。

2、AMQP

高級消息隊列協議(AMQP)是一個非同步消息傳遞所使用的應用層協議規範。作為線路層協議(AMQP是一個抽象的協議,它不負責處理具體的數據),而不是API(例如Java消息系統JMS),AMQP客戶端能夠無視消息的來源任意發送和接受信息。
AMQP的原始用途只是為金融界提供一個可以彼此協作的消息協議,而現在的目標則是為通用消息隊列架構提供通用構建工具。因此,面向消息的中間件(MOM)系統,例如發佈/訂閱隊列,沒有作為基本元素實現。反而通過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一部分,形成了線上路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如之前提到的發佈/訂閱,隊列,事務以及流數據,並且添加了額外的特性,例如更易於擴展,基於內容的路由。

擴展閱讀:既然有高級的消息協議,必然有簡單的協議,STOMP(Simple (or Streaming) Text Orientated Messaging Protocol),也就是簡單消息文本協議,猛擊這裡

3、MSMQ

這裡附帶介紹一下MSMQ。.NET開發者接觸最多的可能還是這個消息隊列,我知道有兩個以.NET作為主要開發語言的公司都選擇MSMQ來開發公共框架如ESB、日誌組件等。

如果你有.NET下MSMQ(微軟消息隊列)開發和使用經驗,一定不會對隊列常用術語陌生。對比一下,對後面RabbitMQ的學習和理解非常有幫助。

邏輯結構如下:

4、基本術語  

安裝好RabbitMQ後,可以啟用插件,打開RabbitMQ自帶的後臺,一圖勝千言,你會看到很多似曾相識的技術術語和名詞。

當然你也可以參考這裡的圖片示例一個一個驗證下麵的名詞。

(1)Broker:消息隊列伺服器實體。

(2)Producer:生產者。

(3)Consumer:消費者。

(4)Queue(隊列):消息隊列載體,每個消息都會被投入到一個或多個隊列。Queue是 RabbitMQ 的內部對象,用於存儲消息;消費者Consumer就是通過訂閱隊列來獲取消息的,RabbitMQ 中的消息都只能存儲在 Queue 中,生產者Producer生產消息並最終投遞到 Queue 中,消費者可以從 Queue 中獲取消息並消費;多個消費者可以訂閱同一個 Queue。

(5)Connection(連接):Producer 和 Consumer 通過TCP 連接到 RabbitMQ Server。

(6)Channel(通道):基於 Connection創建,數據流動都是在 Channel 中進行。

(7)Exchange(交換器):生產者將消息發送到 Exchange(交換器),由Exchange 將消息路由到一個或多個 Queue 中(或者丟棄);Exchange 並不存儲消息;Exchange Types 常用的有 Fanout、Direct、Topic 和Header四種類型,每種類型對應不同的路由規則:
Direct:完全匹配,消息路由到那些 Routing Key 與 Binding Key 完全匹配的 Queue 中。比如 Routing Key 為mq_cleint-key,只會轉發mq_cleint-key,不會轉發mq_cleint-key.1,也不會轉發mq_cleint-key.1.2。
Topic:模式匹配,Exchange 會把消息發送到一個或者多個滿足通配符規則的 routing-key 的 Queue。其中*表號匹配一個 word,#匹配多個 word 和路徑,路徑之間通過.隔開。如滿足a.*.c的 routing-key 有a.hello.c;滿足#.hello的 routing-key 有a.b.c.hello。
Fanout:忽略匹配,把所有發送到該 Exchange 的消息路由到所有與它綁定 的Queue 中。

Header:也根據規則匹配,相較於Direct和Topic固定地使用RoutingKey ,Headers 則是一個自定義匹配規則的類型。在隊列與交換器綁定時, 會設定一組鍵值對(Key-Value)規則, 消息中也包括一組鍵值對( Headers 屬性), 當這些鍵值對有一對,,或全部匹配時, 消息被投送到對應隊列。

(8)Binding(綁定):是 Exchange(交換器)將消息路由給 Queue 所需遵循的規則。

(9)Routing Key(路由鍵):消息發送給 Exchange(交換器)時,消息將擁有一個路由鍵(預設為空), Exchange(交換器)根據這個路由鍵將消息發送到匹配的隊列中。

(10)Binding Key(綁定鍵):指定當前 Exchange(交換器)下,什麼樣的 Routing Key(路由鍵)會被下派到當前綁定的 Queue 中。

5、應用場景

我們使用一個技術或組件或中間件,必須要非常理解它的適用場景,否則很容易誤用。

RabbitMQ的經典應用場景包括:非同步處理、應用解耦、流量削峰、日誌處理、消息通訊。

已經有很多人總結了這5種場景下的RabbitMQ實際應用。

推薦閱讀:猛擊這裡

到這裡,RabbitMQ基礎知識介紹結束,下麵開始動手實踐。

添加依賴

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
RabbitMQ

配置RabbitMQ

## RabbitMQ相關配置
spring.application.name=springbootdemo
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=springbootmq
spring.rabbitmq.password=123456
application.mq.properties

新增RabbitMQConfig類

package com.power.demo.messaging;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ消息隊列配置類
 * <p>
 * 註意:如果已在配置文件中聲明瞭Queue對象,就不用在RabbitMQ的管理員頁面創建隊列(Queue)了
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 聲明接收字元串的隊列 Hello 預設
     *
     * @return
     */
    @Bean
    public Queue stringQueue() {

        //boolean isDurable = true;//是否持久化
        //boolean isExclusive = false;  //僅創建者可以使用的私有隊列,斷開後自動刪除
        //boolean isAutoDelete = false;  //當所有消費客戶端連接斷開後,是否自動刪除隊列
        //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete);
        //return  queue;

        //return new Queue(MQField.HELLO_STRING_QUEUE); //預設支持持久化

        return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE)
                //.exclusive()
                //.autoDelete()
                .build();
    }

    /**
     * 聲明接收Goods對象的隊列 Hello  支持持久化
     *
     * @return
     */
    @Bean
    public Queue goodsQueue() {

        return QueueBuilder.durable(MQField.HELLO_GOODS_QUEUE).build();
    }

    /**
     * 聲明WorkQueue隊列 competing consumers pattern,多個消費者不會重覆消費隊列的相同消息
     *
     * @return
     */
    @Bean
    public Queue workQueue() {
        return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build();
    }

    /**
     * 消息隊列中最常見的模式:發佈訂閱模式
     * <p>
     * 聲明發佈訂閱模式隊列 Publish/Subscribe
     * <p>
     * exchange類型包括:direct, topic, headers 和 fanout
     **/

    /*fanout(廣播)隊列相關聲明開始*/
    @Bean
    public Queue fanOutAQueue() {
        return QueueBuilder.durable(MQField.MY_FANOUTA_QUEUE).build();
    }

    @Bean
    public Queue fanOutBQueue() {
        return QueueBuilder.durable(MQField.MY_FANOUTB_QUEUE).build();
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build();

        //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange);
    }

    /*fanout隊列相關聲明結束*/


    /*topic隊列相關聲明開始*/

    @Bean
    public Queue topicAQueue() {
        return QueueBuilder.durable(MQField.MY_TOPICA_QUEUE).build();
    }

    @Bean
    public Queue topicBQueue() {
        return QueueBuilder.durable(MQField.MY_TOPICB_QUEUE).build();
    }

    @Bean
    TopicExchange topicExchange() {
        return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build();
    }

    //綁定時,註意隊列名稱與上述方法名一致
    @Bean
    Binding bindingTopicAExchangeMessage(Queue topicAQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicAQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYA);
    }

    @Bean
    Binding bindingTopicBExchangeMessages(Queue topicBQueue, TopicExchange topicExchange) {

        return BindingBuilder.bind(topicBQueue).to(topicExchange).with(MQField.MY_TOPIC_ROUTINGKEYB);

    }

    /*topic隊列相關聲明結束*/

    /*direct隊列相關聲明開始*/

    @Bean
    public Queue directAQueue() {
        return QueueBuilder.durable(MQField.MY_DIRECTA_QUEUE).build();
    }

    @Bean
    public Queue directBQueue() {
        return QueueBuilder.durable(MQField.MY_DIRECTB_QUEUE).build();
    }

    /**
     * 聲明Direct交換機 支持持久化.
     *
     * @return the exchange
     */
    @Bean
    DirectExchange directExchange() {
        return (DirectExchange) ExchangeBuilder.directExchange(MQField.MY_DIRECT_EXCHANGE).durable(true).build();
    }

    @Bean
    Binding bindingDirectAExchangeMessage(Queue directAQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directAQueue).to(directExchange).with(MQField.MY_DIRECT_ROUTINGKEYA);
    }

    @Bean
    Binding bindingDirectBExchangeMessage(Queue directBQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directBQueue).to(directExchange)
                //.with(MQField.MY_DIRECT_ROUTINGKEYB)
                .with(MQField.MY_DIRECT_ROUTINGKEYB);
    }

    /*direct隊列相關聲明結束*/
}
RabbitMQConfig

RabbitMQConfig我將常用到的模式都配置在裡面了,註釋已經寫得很清楚,在詳細介紹模式的地方會用到這裡定義的隊列、綁定和交換器。

持久化配置

在RabbitMQConfig類中尤其註意這幾個參數,包括是否可持久化durable;僅創建者可以使用的私有隊列,斷開後自動刪除exclusive;當所有消費客戶端連接斷開後,是否自動刪除隊列autoDelete。其中durable和autoDelete對隊列和交換器都可以配置。

RabbitMQ支持的消息的持久化(durable),也就是將數據寫在磁碟上,為了數據安全考慮,絕大多數場景下我們都會選擇持久化,可能記錄一些不是業務必須的日誌稍微例外。
消息隊列持久化包括3個部分:

(1)、隊列持久化,在聲明時指定Queue.durable為1

(2)、交換器持久化,在聲明時指定Exchange.durable為1

(3)、消息持久化,在投遞時指定消息的delivery_mode為2(而1表示非持久化) 參考:這裡

如果Exchange和Queue都是持久化的,那麼它們之間的Binding也是持久化的;如果Exchange和Queue兩者之間有一個持久化,另一個非持久化,就不允許建立綁定。

二、常見模式

在Spring Boot下使用RabbitMQ非常容易,直接調用AmqpTemplate類封裝好的介面即可。

1、hello world

 

P為生產者,C為消費者,中間紅色框表示消息隊列。生產者P將消息發送到消息隊列Queue,消費者C對消息進行處理。

生產者:

package com.power.demo.messaging.hello;

import com.power.demo.entity.vo.GoodsVO;
import com.power.demo.messaging.MQField;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Hello消息生產者
 **/
@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public boolean send(String message) throws Exception {
        boolean isOK = false;

        if (StringUtils.isEmpty(message)) {
            System.out.println("消息為空");
            return isOK;
        }

        rabbitTemplate.convertAndSend(MQField.HELLO_STRING_QUEUE, message);

        isOK = true;

        System.out.println(String.format("HelloSender發送字元串消息結果:%s", isOK));

        return isOK;
    }

    public boolean send(GoodsVO goodsVO) throws Exception {

        boolean isOK = false;

        rabbitTemplate.convertAndSend(MQField.HELLO_GOODS_QUEUE, goodsVO);

        isOK = true;

        System.out.println(String.format("HelloSender發送對象消息結果:%s", isOK));

        return isOK;

    }

}
HelloSender

消費者:

package com.power.demo.messaging.hello;

import com.power.demo.entity.vo.GoodsVO;
import com.power.demo.messaging.MQField;
import com.power.demo.util.SerializeUtil;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Hello消息消費者
 **/
@Component
public class HelloReceiver {

    @RabbitListener(queues = MQField.HELLO_STRING_QUEUE)
    @RabbitHandler
    public void process(String message) {

        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("HelloReceiver接收到的字元串消息是 => " + message);
    }


    @RabbitListener(queues = MQField.HELLO_GOODS_QUEUE)
    @RabbitHandler
    public void process(GoodsVO goodsVO) {
        System.out.println("------ 接收實體對象 ------");
        System.out.println("HelloReceiver接收到的實體對象是 => " + SerializeUtil.Serialize(goodsVO));
    }
}
HelloReceiver

這是最簡單的一種模式,這個最簡單示例,可以看到應用場景里的非同步處理的影子。

在Controller中,新增一個介面:

    @RequestMapping(value = "/hello/sendmsg", method = RequestMethod.GET)
    @ApiOperation("簡單字元串消息測試")
    @ApiImplicitParams({
            @ApiImplicitParam(paramType = "query", name = "message", required = true, value = "字元串消息", dataType = "String")
    })
    public String sendMsg(String message) throws Exception {

        boolean isOK = helloSender.send(message);

        return String.valueOf(isOK);
    }
sendmsg

按照傳統方式調用RPC介面,通常都是同步等待介面返回,而使用隊列後,消息生產者直接向RabbitMQ伺服器發送一條消息,不需要同步等待這個消息的處理結果。

示例代碼中,消息消費者會刻意等待5秒(Thread.sleep(5000);)後才處理(列印出)消息,但是實際調用這個介面的時候,非常快就返回成功結果了,因為這個發送消息的動作不需要等待消費者消費消息的結果。

發送的消息,除了簡單消息對象如字元串等,示例里你還看到有一個發送商品對象的消息,也就是說明RabbitMQ支持自定義的複雜對象消息。

2、work queues

P為生產者,C1、C2為消費者,中間紅色框表示消息隊列。生產者P將消息發送到消息隊列Queue,消費者C1和C2對消息進行處理。

這種模式比較容易產生誤解的地方是,多個消費者會不會消費隊列里的同一條消息。答案是不會。

官方的說明是因為消費者根據競爭消費模式(competing consumers pattern)分派任務(Distributing tasks among workers (the competing consumers pattern) )。

對於work queues這種模式,同一條消息M1,要麼C1拉取到,要麼C2拉取到,不會出現C1和C2同時拉取到並消費。

當然,這種模式還可以擴展,除了一個生產者,也可以有多個生產者。

生產者:

package com.power.demo.messaging.workqueues;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class WorkProducerA {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public boolean send(String message) throws Exception {
        boolean isOK = false;

        if (StringUtils.isEmpty(message)) {
            System.out.println("消息為空");
            return isOK;
        }

        rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message);

        isOK = true;

        System.out.println(String.format("WorkProducerA發送字元串消息結果:%s", isOK));

        return isOK;
    }
}
WorkProducerA

相同隊列下另一個生產者:

package com.power.demo.messaging.workqueues;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class WorkProducerB {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public boolean send(String message) throws Exception {
        boolean isOK = false;

        if (StringUtils.isEmpty(message)) {
            System.out.println("消息為空");
            return isOK;
        }

        rabbitTemplate.convertAndSend(MQField.MY_WORKER_QUEUE, message);

        isOK = true;

        System.out.println(String.format("WorkProducerB發送字元串消息結果:%s", isOK));

        return isOK;
    }
}
WorkProducerB

消費者:

package com.power.demo.messaging.workqueues;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

@Component
public class WorkConsumerA {

    private static AtomicInteger atomicInteger = new AtomicInteger();

    @RabbitListener(queues = MQField.MY_WORKER_QUEUE)
    @RabbitHandler
    public void process(String message) throws Exception {

        int index = atomicInteger.getAndIncrement();

        Thread.sleep(2000);

        System.out.println("WorkConsumerA接收到的字元串消息是 => " + message);

        System.out.println("WorkConsumerA自增序號 => " + index);
    }

}
WorkConsumerA

另一個消費者:

package com.power.demo.messaging.workqueues;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

@Component
public class WorkConsumerB {

    private static AtomicInteger atomicInteger = new AtomicInteger();

    @RabbitListener(queues = MQField.MY_WORKER_QUEUE)
    @RabbitHandler
    public void process(String message) throws Exception {

        int index = atomicInteger.getAndIncrement();

        Thread.sleep(10);

        System.out.println("WorkConsumerB接收到的字元串消息是 => " + message);

        System.out.println("WorkConsumerB自增序號 => " + index);
    }

}
View Code

pub/sub

應用最廣泛的發佈/訂閱模式。

官方的說法是:發送多個消息到多個消費者(Sending messages to many consumers at once.)

這個模式和work queues模式最明顯的區別是,隊列Queue前加了一層,多了Exchange(交換器)。

 P為生產者,X為交換器,C1、C2為消費者,中間紅色框表示消息隊列。生產者P將消息不是直接發送到隊列Queue,而是發送到交換器X(註意:交換器Exchange並不存儲消息),然後由交換機X發送給兩個隊列,兩個消費者C1和C2各自監聽一個隊列,來消費消息。

根據交換器類型的不同,又可以分為Fanout、Direct和Topic這三種消費方式,Headers方式實際應用不是非常廣泛,本文暫不討論。

3、fanout

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。

(1)可以理解為路由表的模式

(2)這種模式不需要RoutingKey,即使配置了也忽略

(3)這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定

(4)如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄

Fanout廣播模式實現同一個消息被多個消費者消費,而work queues是同一個消息只能有一個消費者(競爭去)消費。

生產者:

package com.power.demo.messaging.pubsub.fanout;

import com.power.demo.entity.vo.GoodsVO;
import com.power.demo.messaging.MQField;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public boolean send(GoodsVO goodsVO) throws Exception {

        boolean isOK = false;

        if (goodsVO == null) {
            System.out.println("消息為空");
            return isOK;
        }

        rabbitTemplate.convertAndSend(MQField.MY_FANOUT_EXCHANGE, "", goodsVO);

        isOK = true;

        System.out.println(String.format("FanoutSender發送對象消息結果:%s", isOK));

        return isOK;

    }

}
FanoutSender

消費者:

package com.power.demo.messaging.pubsub.fanout;

import com.power.demo.entity.vo.GoodsVO;
import com.power.demo.messaging.MQField;
import com.power.demo.util.SerializeUtil;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutReceiverA {

    @RabbitListener(queues = MQField.MY_FANOUTA_QUEUE)
    @RabbitHandler
    public void process(GoodsVO goodsVO) {
        System.out.println("FanoutReceiverA接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO));
    }
}
FanoutReceiverA

另一個消費者:

package com.power.demo.messaging.pubsub.fanout;

import com.power.demo.entity.vo.GoodsVO;
import com.power.demo.messaging.MQField;
import com.power.demo.util.SerializeUtil;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutReceiverB {

    @RabbitListener(queues = MQField.MY_FANOUTB_QUEUE)
    @RabbitHandler
    public void process(GoodsVO goodsVO) {
        System.out.println("FanoutReceiverB接收到的商品消息是 => " + SerializeUtil.Serialize(goodsVO));
    }
}
FanoutReceiverB

4、direct

Fanout是1對多以廣播的方式,發送給所有的消費者。

Direct則是創建消息隊列的時候,指定一個BindingKey。當發送者發送消息的時候,指定對應的RoutingKey,當RoutingKey和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中。
Direct廣播模式最明顯不同於Fanout模式的地方是,消費者可以進行消息過濾,有選擇的進行接收想要消費的消息,也就是隊列綁定關鍵字,發送者將數據根據關鍵字發送到Exchange,Exchange根據關鍵字判定應該將數據發送(路由)到指定隊列。

任何發送到Direct Exchange的消息都會被轉發到RoutingKey中指定的Queue。

(1)消息傳遞時需要一個“RoutingKey”,可以簡單的理解為要發送到的隊列名字

(2)如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄

生產者:

package com.power.demo.messaging.pubsub.direct;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class DirectSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public boolean sendDirectA(String message) throws Exception {
        boolean isOK = false;

        if (StringUtils.isEmpty(message)) {
            System.out.println("消息為空");
            return isOK;
        }

        rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYA, message);

        isOK = true;

        System.out.println(String.format("DirectSender發送DirectA字元串消息結果:%s", isOK));

        return isOK;
    }

    public boolean sendDirectB(String message) throws Exception {
        boolean isOK = false;

        if (StringUtils.isEmpty(message)) {
            System.out.println("消息為空");
            return isOK;
        }

        rabbitTemplate.convertAndSend(MQField.MY_DIRECT_EXCHANGE, MQField.MY_DIRECT_ROUTINGKEYB, message);

        isOK = true;

        System.out.println(String.format("DirectSender發送DirectB字元串消息結果:%s", isOK));

        return isOK;
    }

}
DirectSender

消費者:

package com.power.demo.messaging.pubsub.direct;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectReceiverA {

    @RabbitListener(queues = MQField.MY_DIRECTA_QUEUE)
    @RabbitHandler
    public void process(String message) {
        System.out.println("DirectReceiverA接收到的字元串消息是 => " + message);
    }

}
DirectReceiverA

另一個消費者:

package com.power.demo.messaging.pubsub.direct;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectReceiverB {

    @RabbitListener(queues = MQField.MY_DIRECTB_QUEUE)
    @RabbitHandler
    public void process(String message) {
        System.out.println("DirectReceiverB接收到的字元串消息是 => " + message);
    }

}
DirectReceiverB

5、topic

Topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字元串),而當發送消息的時候,只有指定的RoutingKey和該模式相匹配的時候,消息才會被髮送到該消息隊列中。

任何發送到Topic Exchange的消息都會被轉發到所有關心RoutingKey中指定話題的Queue上

(1)每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RoutingKey),Exchange會將消息轉發到所有關註主題能與RouteKey模糊匹配的隊列

(2)需要RoutingKey,也需要提前綁定Exchange與Queue

(3)在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RoutingKey為”mq.log.error”的消息會被轉發到該隊列)

(4)“#”表示0個或若幹個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但“log.#”能與上述兩者都匹配

(5)如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息

生產者:

package com.power.demo.messaging.pubsub.topic;

import com.power.demo.messaging.MQField;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 介紹 Vue.js是一套構建用戶界面的漸進式框架。Vue 只關註視圖層,採用自底向上增量開發的設計。Vue 的目標是通過儘可能簡單的 API 實現響應的數據綁定和組合的視圖組件。 安裝node.js 從node官網下載並安裝node,安裝步驟很簡單,只要一路“next”就可以了。安裝完成後,打開命令 ...
  • 新增的標簽和屬性 1、結構標簽 article section aside nav header footer hgroup figure address 2、媒體標簽 video audio embed 3、表單屬性 email url number range ...
  • 相關內容: 首發時間:2018-03-02 修改: 什麼是css選擇器: 介紹: css可以設置標簽的樣式,為了更好的設置樣式以及為了方便給某些標簽指定樣式(批量的給某些標簽增加樣式),所以有了css選擇器,css選擇器可以篩選出指定的標簽。篩選出來之後就可以給對應的標簽設置樣式。 css選擇器的語 ...
  • <!DOCTYPE html><html><head lang="en"> <meta charset="UTF-8"> <title></title> <link rel="stylesheet" href="../css/reset.css"/> <link rel="stylesheet" h ...
  • 1.模型管理 :web線上流程設計器、預覽流程xml、導出xml、部署流程 2.流程管理 :導入導出流程資源文件、查看流程圖、根據流程實例反射出流程模型、激活掛起 3.運行中流程:查看流程信息、當前任務節點、當前流程圖、作廢暫停流程、指派待辦人 4.歷史的流程:查看流程信息、流程用時、流程狀態、查看 ...
  • Timer是一種定時器工具,用來在一個後臺線程計劃執行指定任務。它可以計劃執行一個任務一次或反覆多次。 TimerTask一個抽象類,它的子類代表一個可以被Timer計劃的任務。 ...
  • 安裝docker 好慢....一個小時吧... 啟動docker 先執行命令docker version來來一下: 發現沒有啟動docker server, 執行命令: 然後再查看一下 docker version docker pull命令 pull命令用於拉取鏡像 可以使用docker pull ...
  • 很多Python的程式員都會混淆 迭代器 和 生成器 的概念和作用,分不清到底兩個有什麼區別。今天我們來好好說一說這兩個概念。迭代器(Iterator)Iterator PatternIterator 是一種設計模式,它的作用是,提供一種順序訪問一個聚合對象中的各個元素,但又不需要暴露出其內部實現的... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...