MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅動架構中的Broker。 快速入門 1.publisher實現 public class PublisherTest { @Test public void testSendMessage() throws I ...
MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅動架構中的Broker。
快速入門
1.publisher實現
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設置連接參數,分別是:主機名、埠號、vhost、用戶名、密碼
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("rabbit");
factory.setPassword("123321");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創建通道Channel
Channel channel = connection.createChannel();
// 3.創建隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發送消息成功:【" + message + "】");
// 5.關閉通道和連接
channel.close();
connection.close();
}
}
2.consumer實現
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設置連接參數,分別是:主機名、埠號、vhost、用戶名、密碼
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創建通道Channel
Channel channel = connection.createChannel();
// 3.創建隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
SpringAMQP-集成SpringBoot
1.Basic Queue 簡單隊列模型
a.導入依賴
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
b.消息發送
首先配置MQ地址,在publisher服務的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 主機名
port: 5672 # 埠
virtual-host: / # 虛擬主機
username: rabbit # 用戶名
password: 123321 # 密碼
然後在publisher服務中編寫測試類SpringAmqpTest,並利用RabbitTemplate實現消息發送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 隊列名稱
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 發送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
c.消息接收
首先配置MQ地址,在consumer服務的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 主機名
port: 5672 # 埠
virtual-host: / # 虛擬主機
username: rabbit # 用戶名
password: 123321 # 密碼
然後在consumer服務的cn.itcast.mq.listener
包中新建一個類SpringRabbitListener,代碼如下:
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消費者接收到消息:【" + msg + "】");
}
}
2.WorkQueue-任務隊列
Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
當消息處理比較耗時的時候,可能生產消息的速度會遠遠大於消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。
但在預設情況下,消息是平均分配給每個消費者,並沒有考慮到消費者的處理能力。這樣顯然是有問題的。
在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
3.Fanout-廣播
在廣播模式下,消息發送流程是這樣的:
- 1) 可以有多個隊列
- 2) 每個隊列都要綁定到Exchange(交換機)
- 3) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定
- 4) 交換機把消息發送給綁定過的所有隊列
- 5) 訂閱隊列的消費者都能拿到消息
a.聲明隊列和交換機
Spring提供了一個介面Exchange,來表示所有不同類型的交換機,
在consumer中創建一個類,聲明隊列和交換機:
@Configuration
public class FanoutConfig {
/**
* 聲明交換機
* @return Fanout類型交換機
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("rabbit.fanout");
}
/**
* 第1個隊列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 綁定隊列和交換機
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2個隊列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 綁定隊列和交換機
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
b.消息發送
在publisher服務的SpringAmqpTest類中添加測試方法:
@Test
public void testFanoutExchange() {
// 隊列名稱
String exchangeName = "rabbit.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
c.消息接收
在consumer服務的SpringRabbitListener中添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}
4.Direct-指定
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發送方在 向 Exchange發送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
案例需求如下:
-
利用@RabbitListener聲明Exchange、Queue、RoutingKey
-
在consumer服務中,編寫兩個消費者方法,分別監聽direct.queue1和direct.queue2
-
在publisher中編寫測試方法,向itcast. direct發送消息
a.基於註解聲明隊列和交換機
基於@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基於註解方式來聲明。
在consumer的SpringRabbitListener中添加兩個消費者,同時基於註解來聲明隊列和交換機:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "rabbit.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "rabbit.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】");
}
b.消息發送
在publisher服務的SpringAmqpTest類中添加測試方法:
@Test
public void testSendDirectExchange() {
// 交換機名稱
String exchangeName = "rabbit.direct";
// 消息
String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!";
// 發送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
5.Topic-主題
Topic
類型的Exchange
與Direct
相比,都是可以根據RoutingKey
把消息路由到不同的隊列。只不過Topic
類型Exchange
可以讓隊列在綁定Routing key
的時候使用通配符!
案例需求:
實現思路如下:
-
並利用@RabbitListener聲明Exchange、Queue、RoutingKey
-
在consumer服務中,編寫兩個消費者方法,分別監聽topic.queue1和topic.queue2
-
在publisher中編寫測試方法,向itcast. topic發送消息
a.消息發送
在publisher服務的SpringAmqpTest類中添加測試方法:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交換機名稱
String exchangeName = "rabbit.topic";
// 消息
String message = "喜報!孫悟空大戰哥斯拉,勝!";
// 發送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
b.消息接收
在consumer服務的SpringRabbitListener中添加方法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "rabbit.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "rabbit.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】");
}
6.消息轉換器
Spring會把你發送的消息序列化為位元組發送給MQ,接收消息的時候,還會把位元組反序列化為Java對象。
只不過,預設情況下Spring採用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
- 數據體積過大
- 有安全漏洞
- 可讀性差
因此可以使用JSON方式來做序列化和反序列化。
在publisher和consumer兩個服務中都引入依賴:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.5</version>
</dependency>
配置消息轉換器。
在啟動類中添加一個Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
#安裝(Docker)
1.拉取鏡像
docker pull rabbitmq:3-management
2.部署容器
docker run \
-e RABBITMQ_DEFAULT_USER=rabbit \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
--restart=always \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management