RabbitMQ快速使用代碼手冊

来源:https://www.cnblogs.com/Changes404/archive/2023/06/16/17486179.html
-Advertisement-
Play Games

本篇博客的內容為RabbitMQ在開發過程中的快速上手使用,側重於代碼部分,幾乎沒有相關概念的介紹,相關概念請參考以下csdn博客,兩篇都是我找的精華帖,供大家學習。本篇博客也持續更新~~~ ...


本篇博客的內容為RabbitMQ在開發過程中的快速上手使用,側重於代碼部分,幾乎沒有相關概念的介紹,相關概念請參考以下csdn博客,兩篇都是我找的精華帖,供大家學習。本篇博客也持續更新~~~
內容代碼部分由於word轉md格式有些問題,可以直接查看我的有道雲筆記,鏈接:https://note.youdao.com/s/Ab7Cjiu

參考文檔

csdn博客:

基礎部分:https://blog.csdn.net/qq_35387940/article/details/100514134

高級部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

application.yml

server:

port: 8021

spring:

#給項目來個名字

application:

name: rabbitmq-provider

#配置rabbitMq 伺服器

rabbitmq:

host: 127.0.0.1

port: 5672

username: root

password: root

#虛擬host 可以不設置,使用server預設host

virtual-host: JCcccHost

#確認消息已發送到交換機(Exchange)

#publisher-confirms: true

publisher-confirm-type: correlated

#確認消息已發送到隊列(Queue)

publisher-returns: true

完善更多信息

spring:

rabbitmq:

host: localhost

port: 5672

virtual-host: /

username: guest

password: guest

publisher-confirm-type: correlated

publisher-returns: true

template:

mandatory: true

retry:

#發佈重試,預設false

enabled: true

#重試時間 預設1000ms

initial-interval: 1000

#重試最大次數 最大3

max-attempts: 3

#重試最大間隔時間

max-interval: 10000

#重試的時間隔乘數,比如配2,0
第一次等於10s,第二次等於20s,第三次等於40s

multiplier: 1

listener:

\# 預設配置是simple

type: simple

simple:

\# 手動ack Acknowledge mode of container. auto none

acknowledge-mode: manual

#消費者調用程式線程的最小數量

concurrency: 10

#消費者最大數量

max-concurrency: 10

#限制消費者每次只處理一條信息,處理完在繼續下一條

prefetch: 1

#啟動時是否預設啟動容器

auto-startup: true

#被拒絕時重新進入隊列

default-requeue-rejected: true

相關註解說明

@RabbitListener 註解是指定某方法作為消息消費的方法,例如監聽某 Queue
裡面的消息。

@RabbitListener標註在方法上,直接監聽指定的隊列,此時接收的參數需要與發送市類型一致。

\@Component

public class PointConsumer {

//監聽的隊列名

\@RabbitListener(queues = \"point.to.point\")

public void processOne(String name) {

System.out.println(\"point.to.point:\" + name);

}

}

@RabbitListener 可以標註在類上面,需配合 @RabbitHandler 註解一起使用

@RabbitListener 標註在類上面表示當有收到消息的時候,就交給
@RabbitHandler 的方法處理,根據接受的參數類型進入具體的方法中。

\@Component

\@RabbitListener(queues = \"consumer_queue\")

public class Receiver {

\@RabbitHandler

public void processMessage1(String message) {

System.out.println(message);

}

\@RabbitHandler

public void processMessage2(byte\[\] message) {

System.out.println(new String(message));

}

}

@Payload

可以獲取消息中的 body 信息

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body) {

System.out.println(\"body:\"+body);

}

@Header,@Headers

可以獲得消息中的 headers 信息

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body, \@Header String token)
{

System.out.println(\"body:\"+body);

System.out.println(\"token:\"+token);

}

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body, \@Headers
Map\<String,Object\> headers) {

System.out.println(\"body:\"+body);

System.out.println(\"Headers:\"+headers);

}

快速使用

配置xml文件

<dependency\>

\<groupId\>org.springframework.boot\</groupId\>

\<artifactId\>spring-boot-starter-amqp\</artifactId\>

\</dependency\>

配置exchange、queue

註解快速創建版本

\@Configuration

public class RabbitmqConfig {

//創建交換機

//通過ExchangeBuilder能創建direct、topic、Fanout類型的交換機

\@Bean(\"bootExchange\")

public Exchange bootExchange() {

return
ExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();

}

//創建隊列

\@Bean(\"bootQueue\")

public Queue bootQueue() {

return QueueBuilder.durable(\"zx_queue\").build();

}

/\*\*

\* 將隊列與交換機綁定

\*

\* \@param queue

\* \@param exchange

\* \@return

\*/

\@Bean

public Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,
\@Qualifier(\"bootExchange\") Exchange exchange) {

return
BindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();

}

}

Direct

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class DirectRabbitConfig {

//隊列 起名:TestDirectQueue

\@Bean

public Queue TestDirectQueue() {

//
durable:是否持久化,預設是false,持久化隊列:會被存儲在磁碟上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效

//
exclusive:預設也是false,只能被當前創建的連接使用,而且當連接關閉後隊列即被刪除。此參考優先順序高於durable

//
autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。

// return new Queue(\"TestDirectQueue\",true,true,false);

//一般設置一下隊列的持久化就好,其餘兩個就是預設false

return new Queue(\"TestDirectQueue\",true);

}

//Direct交換機 起名:TestDirectExchange

\@Bean

DirectExchange TestDirectExchange() {

// return new DirectExchange(\"TestDirectExchange\",true,true);

return new DirectExchange(\"TestDirectExchange\",true,false);

}

//綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting

\@Bean

Binding bindingDirect() {

return
BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");

}

\@Bean

DirectExchange lonelyDirectExchange() {

return new DirectExchange(\"lonelyDirectExchange\");

}

}

Fanout

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class FanoutRabbitConfig {

/\*\*

\* 創建三個隊列 :fanout.A fanout.B fanout.C

\* 將三個隊列都綁定在交換機 fanoutExchange 上

\* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用

\*/

\@Bean

public Queue queueA() {

return new Queue(\"fanout.A\");

}

\@Bean

public Queue queueB() {

return new Queue(\"fanout.B\");

}

\@Bean

public Queue queueC() {

return new Queue(\"fanout.C\");

}

\@Bean

FanoutExchange fanoutExchange() {

return new FanoutExchange(\"fanoutExchange\");

}

\@Bean

Binding bindingExchangeA() {

return BindingBuilder.bind(queueA()).to(fanoutExchange());

}

\@Bean

Binding bindingExchangeB() {

return BindingBuilder.bind(queueB()).to(fanoutExchange());

}

\@Bean

Binding bindingExchangeC() {

return BindingBuilder.bind(queueC()).to(fanoutExchange());

}

}

Topic

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class TopicRabbitConfig {

//綁定鍵

public final static String man = \"topic.man\";

public final static String woman = \"topic.woman\";

\@Bean

public Queue firstQueue() {

return new Queue(TopicRabbitConfig.man);

}

\@Bean

public Queue secondQueue() {

return new Queue(TopicRabbitConfig.woman);

}

\@Bean

TopicExchange exchange() {

return new TopicExchange(\"topicExchange\");

}

//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man

//這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列

\@Bean

Binding bindingExchangeMessage() {

return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);

}

//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#

// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列

\@Bean

Binding bindingExchangeMessage2() {

return
BindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");

}

}

生產者發送消息

直接發送給隊列

//指定消息隊列的名字,直接發送消息到消息隊列中

\@Test

public void testSimpleQueue() {

// 隊列名稱

String queueName = \"simple.queue\";

// 消息

String message = \"hello, spring amqp!\";

// 發送消息

rabbitTemplate.convertAndSend(queueName, message);

}

發送給交換機,然後走不同的模式

////指定交換機的名字,將消息發送給交換機,然後不同模式下,消息隊列根據key得到消息

\@Test

public void testSendDirectExchange() {

// 交換機名稱,有三種類型

String exchangeName = \"itcast.direct\";

// 消息

String message =
\"紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!\";

// 發送消息,red為隊列的key,因此此隊列會得到消息

rabbitTemplate.convertAndSend(exchangeName, \"red\", message);

}

也可以將發送的消息封裝到HashMap中然後發送給交換機

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

import java.time.format.DateTimeFormatter;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@RestController

public class SendMessageController {

\@Autowired

RabbitTemplate rabbitTemplate;
//使用RabbitTemplate,這提供了接收/發送等等方法

\@GetMapping(\"/sendDirectMessage\")

public String sendDirectMessage() {

String messageId = String.valueOf(UUID.randomUUID());

String messageData = \"test message, hello!\";

String createTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd
HH:mm:ss\"));

Map\<String,Object\> map=new HashMap\<\>();

map.put(\"messageId\",messageId);

map.put(\"messageData\",messageData);

map.put(\"createTime\",createTime);

//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange

rabbitTemplate.convertAndSend(\"TestDirectExchange\",
\"TestDirectRouting\", map);

return \"ok\";

}

}

消費者接收消息

//使用註解@RabbitListener定義當前方法監聽RabbitMQ中指定名稱的消息隊列。

\@Component

public class MessageListener {

\@RabbitListener(queues = \"direct_queue\")

public void receive(String id){

System.out.println(\"已完成簡訊發送業務(rabbitmq direct),id:\"+id);

}

}

參數用Map接收也可以

\@Component

\@RabbitListener(queues = \"TestDirectQueue\")//監聽的隊列名稱
TestDirectQueue

public class DirectReceiver {

\@RabbitHandler

public void process(Map testMessage) {

System.out.println(\"DirectReceiver消費者收到消息 : \" +
testMessage.toString());

}

}

高級特性

消息可靠性傳遞

有confirm和return兩種

在application.yml中添加以下配置項:

server:

port: 8021

spring:

#給項目來個名字

application:

name: rabbitmq-provider

#配置rabbitMq 伺服器

rabbitmq:

host: 127.0.0.1

port: 5672

username: root

password: root

#虛擬host 可以不設置,使用server預設host

virtual-host: JCcccHost

#確認消息已發送到交換機(Exchange)

#publisher-confirms: true

publisher-confirm-type: correlated

#確認消息已發送到隊列(Queue)

publisher-returns: true

有兩種配置方法:

寫到配置類中

寫到工具類或者普通類中,但是這個類得實現那兩個介面

寫法一

編寫消息確認回調函數

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration

public class RabbitConfig {

\@Bean

public RabbitTemplate createRabbitTemplate(ConnectionFactory
connectionFactory){

RabbitTemplate rabbitTemplate = new RabbitTemplate();

rabbitTemplate.setConnectionFactory(connectionFactory);

//設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎麼樣都強制調用回調函數

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

\@Override

public void confirm(CorrelationData correlationData, boolean ack, String
cause) {

System.out.println(\"ConfirmCallback:
\"+\"相關數據:\"+correlationData);

System.out.println(\"ConfirmCallback: \"+\"確認情況:\"+ack);

System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);

}

});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

\@Override

public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {

System.out.println(\"ReturnCallback: \"+\"消息:\"+message);

System.out.println(\"ReturnCallback: \"+\"回應碼:\"+replyCode);

System.out.println(\"ReturnCallback: \"+\"回應信息:\"+replyText);

System.out.println(\"ReturnCallback: \"+\"交換機:\"+exchange);

System.out.println(\"ReturnCallback: \"+\"路由鍵:\"+routingKey);

}

});

return rabbitTemplate;

}

}

寫法二

\@Component

\@Slf4j

public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {

\@Resource

private RedisTemplate\<String, String\> redisTemplate;

\@Resource

private RabbitTemplate rabbitTemplate;

private String finalId = null;

private SmsDTO smsDTO = null;

/\*\*

\* 發佈者確認的回調

\*

\* \@param correlationData 回調的相關數據。

\* \@param b ack為真,nack為假

\* \@param s 一個可選的原因,用於nack,如果可用,否則為空。

\*/

\@Override

public void confirm(CorrelationData correlationData, boolean b, String
s) {

// 消息發送成功,將redis中消息的狀態(status)修改為1

if (b) {

redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
finalId, \"status\", 1);

} else {

// 發送失敗,放入redis失敗集合中,並刪除集合數據

log.error(\"簡訊消息投送失敗:{}\--\>{}\", correlationData, s);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);

redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);

}

}

/\*\*

\* 發生異常時的消息返回提醒

\*

\* \@param returnedMessage

\*/

\@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

log.error(\"發生異常,返回消息回調:{}\", returnedMessage);

// 發送失敗,放入redis失敗集合中,並刪除集合數據

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);

redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);

}

\@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

}

消息確認機制

手動確認

yml配置

#手動確認 manual

listener:

simple:

acknowledge-mode: manual

寫法一

首先在消費者項目中創建MessageListenerConfig

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.Queue;

import
org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration

public class MessageListenerConfig {

\@Autowired

private CachingConnectionFactory connectionFactory;

\@Autowired

private MyAckReceiver myAckReceiver;//消息接收處理類

\@Bean

public SimpleMessageListenerContainer simpleMessageListenerContainer() {

SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer(connectionFactory);

container.setConcurrentConsumers(1);

container.setMaxConcurrentConsumers(1);

container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
RabbitMQ預設是自動確認,這裡改為手動確認消息

//設置一個隊列

container.setQueueNames(\"TestDirectQueue\");

//如果同時設置多個如下: 前提是隊列都是必須已經創建存在的

//
container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");

//另一種設置隊列的方法,如果使用這種情況,那麼要設置多個,就使用addQueues

//container.setQueues(new Queue(\"TestDirectQueue\",true));

//container.addQueues(new Queue(\"TestDirectQueue2\",true));

//container.addQueues(new Queue(\"TestDirectQueue3\",true));

container.setMessageListener(myAckReceiver);

return container;

}

}

然後創建手動確認監聽類MyAckReceiver(手動確認模式需要實現ChannelAwareMessageListener)

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.ObjectInputStream;

import java.util.Map;

\@Component

public class MyAckReceiver implements ChannelAwareMessageListener {

\@Override

public void onMessage(Message message, Channel channel) throws Exception
{

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

byte\[\] body = message.getBody();

ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));

Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();

String messageId = msgMap.get(\"messageId\");

String messageData = msgMap.get(\"messageData\");

String createTime = msgMap.get(\"createTime\");

ois.close();

System.out.println(\" MyAckReceiver messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"消費的主題消息來自:\"+message.getMessageProperties().getConsumerQueue());

channel.basicAck(deliveryTag, true);
//第二個參數,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認
delivery_tag 小於等於傳入值的所有消息

//channel.basicReject(deliveryTag,
true);//第二個參數,true會重新放回隊列,所以需要自己根據業務邏輯判斷什麼時候使用拒絕

} catch (Exception e) {

channel.basicReject(deliveryTag, false);

e.printStackTrace();

}

}

}

如果想實現不同的隊列,有不同的監聽確認處理機制,做不同的業務處理,那麼這樣做:

首先需要在配置類中綁定隊列,然後只需要根據消息來自不同的隊列名進行區分處理即可

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.ObjectInputStream;

import java.util.Map;

\@Component

public class MyAckReceiver implements ChannelAwareMessageListener {

\@Override

public void onMessage(Message message, Channel channel) throws Exception
{

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

byte\[\] body = message.getBody();

ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));

Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();

String messageId = msgMap.get(\"messageId\");

String messageData = msgMap.get(\"messageData\");

String createTime = msgMap.get(\"createTime\");

ois.close();

if
(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){

System.out.println(\"消費的消息來自的隊列名為:\"+message.getMessageProperties().getConsumerQueue());

System.out.println(\"消息成功消費到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"執行TestDirectQueue中的消息的業務處理流程\...\...\");

}

if
(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){

System.out.println(\"消費的消息來自的隊列名為:\"+message.getMessageProperties().getConsumerQueue());

System.out.println(\"消息成功消費到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"執行fanout.A中的消息的業務處理流程\...\...\");

}

channel.basicAck(deliveryTag, true);

//channel.basicReject(deliveryTag, true);//為true會重新放回隊列

} catch (Exception e) {

channel.basicReject(deliveryTag, false);

e.printStackTrace();

}

}

}

寫法二

\@Component

\@Slf4j

public class SendSmsListener {

\@Resource

private RedisTemplate\<String, String\> redisTemplate;

\@Resource

private SendSmsUtils sendSmsUtils;

/\*\*

\* 監聽發送簡訊普通隊列

\* \@param smsDTO

\* \@param message

\* \@param channel

\* \@throws IOException

\*/

\@RabbitListener(queues = SMS_QUEUE_NAME)

public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
channel) throws IOException {

String messageId = message.getMessageProperties().getMessageId();

int retryCount = (int)
redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
messageId, \"retryCount\");

if (retryCount \> 3) {

//重試次數大於3,直接放到死信隊列

log.error(\"簡訊消息重試超過3次:{}\", messageId);

//basicReject方法拒絕deliveryTag對應的消息,第二個參數是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列。

//該方法reject後,該消費者還是會消費到該條被reject的消息。

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);

return;

}

try {

String phoneNum = smsDTO.getPhoneNum();

String code = smsDTO.getCode();

if(StringUtils.isAnyBlank(phoneNum,code)){

throw new RuntimeException(\"sendSmsListener參數為空\");

}

// 發送消息

SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
code);

SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();

SendStatus sendStatus = sendStatusSet\[0\];

if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"send
success\".equals(sendStatus.getMessage())){

throw new RuntimeException(\"發送驗證碼失敗\");

}

//手動確認消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

log.info(\"簡訊發送成功:{}\",smsDTO);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);

} catch (Exception e) {

redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

/\*\*

\* 監聽到發送簡訊死信隊列

\* \@param sms

\* \@param message

\* \@param channel

\* \@throws IOException

\*/

\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)

public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
channel) throws IOException {

try{

log.error(\"監聽到死信隊列消息==\>{}\",sms);

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}catch (Exception e){

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

}

消費端限流

#配置RabbitMQ

spring:

rabbitmq:

host: 192.168.126.3

port: 5672

username: guest

password: guest

virtual-host: /

#開啟自動確認 none 手動確認 manual

listener:

simple:

#消費端限流機制必須開啟手動確認

acknowledge-mode: manual

#消費端最多拉取的消息條數,簽收後不滿該條數才會繼續拉取

prefetch: 5

消息存活時間TTL

可以設置隊列的存活時間,也可以設置具體消息的存活時間

設置隊列中所有消息的存活時間

return QueueBuilder

.durable(QUEUE_NAME)//隊列持久化

.ttl(10000)//設置隊列的所有消息存活10s

.build();

即在創建隊列時,設置存活時間

設置某條消息的存活時間

//發送消息,並設置該消息的存活時間

\@Test

public void testSendMessage()

{

//1.創建消息屬性

MessageProperties messageProperties = new MessageProperties();

//2.設置存活時間

messageProperties.setExpiration(\"10000\");

//3.創建消息對象

Message message = new
Message(\"sendMessage\...\".getBytes(),messageProperties);

//4.發送消息

rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);

}

若設置中間的消息的存活時間,當過期時,該消息不會被移除,但是該消息已經不會被消費了,需要等到該消息到隊里頂端才會被移除。因為隊列是頭出,尾進,故而要移除它需要等到它在頂端時才可以。

在隊列設置存活時間,也在單條消息設置存活時間,則以時間短的為準

死信隊列

死信隊列和普通隊列沒有任何區別,只需要將普通隊列需要綁定死信交換機和死信隊列就能夠實現功能

import org.springframework.amqp.core.\*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration//Rabbit配置類

public class RabbitConfig4 {

private final String DEAD_EXCHANGE = \"dead_exchange\";

private final String DEAD_QUEUE = \"dead_queue\";

private final String NORMAL_EXCHANGE = \"normal_exchange\";

private final String NORMAL_QUEUE = \"normal_queue\";

//創建死信交換機

\@Bean(DEAD_EXCHANGE)

public Exchange deadExchange()

{

return ExchangeBuilder

.topicExchange(DEAD_EXCHANGE)//交換機類型 ;參數為名字
topic為通配符模式的交換機

.durable(true)//是否持久化,true即存到磁碟,false只在記憶體上

.build();

}

//創建死信隊列

\@Bean(DEAD_QUEUE)

public Queue deadQueue()

{

return QueueBuilder

.durable(DEAD_QUEUE)//隊列持久化

//.maxPriority(10)//設置隊列的最大優先順序,最大可以設置255,但官網推薦不超過10,太高比較浪費資源

.build();

}

//死信交換機綁定死信隊列

\@Bean

//@Qualifier註解,使用名稱裝配進行使用

public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange
exchange, \@Qualifier(DEAD_QUEUE) Queue queue)

{

return BindingBuilder

.bind(queue)

.to(exchange)

.with(\"dead_routing\")

.noargs();

}

//創建普通交換機

\@Bean(NORMAL_EXCHANGE)

public Exchange normalExchange()

{

return ExchangeBuilder

.topicExchange(NORMAL_EXCHANGE)//交換機類型 ;參數為名字
topic為通配符模式的交換機

.durable(true)//是否持久化,true即存到磁碟,false只在記憶體上

.build();

}

//創建普通隊列

\@Bean(NORMAL_QUEUE)

public Queue normalQueue()

{

return QueueBuilder

.durable(NORMAL_QUEUE)//隊列持久化

//.maxPriority(10)//設置隊列的最大優先順序,最大可以設置255,但官網推薦不超過10,太高比較浪費資源

.deadLetterExchange(DEAD_EXCHANGE)//綁定死信交換機

.deadLetterRoutingKey(\"dead_routing\")//死信隊列路由關鍵字

.ttl(10000)//消息存活10s

.maxLength(10)//隊列最大長度為10

.build();

}

//普通交換機綁定普通隊列

\@Bean

//@Qualifier註解,使用名稱裝配進行使用

public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange
exchange, \@Qualifier(NORMAL_QUEUE) Queue queue)

{

return BindingBuilder

.bind(queue)

.to(exchange)

.with(\"my_routing\")

.noargs();

}

}

延遲隊列

RabbitMQ並未實現延遲隊列功能,所以可以通過死信隊列實現延遲隊列的功能

即給普通隊列設置存活時間30分鐘,過期後發送至死信隊列,在死信消費者監聽死信隊列消息,查看訂單狀態,是否支付,未支付則取消訂單,回退庫存即可。

消費者監聽延遲隊列

\@Component

public class ExpireOrderConsumer {

//監聽過期訂單隊列

\@RabbitListener(queues = \"expire_queue\")

public void listenMessage(String orderId)

{

//模擬處理資料庫等業務

System.out.println(\"查詢\"+orderId+\"號訂單的狀態,如果已支付無需處理,如果未支付則回退庫存\");

}

}

控制層代碼

\@RestController

public class OrderController {

\@Autowired

private RabbitTemplate rabbitTemplate;

\@RequestMapping(value = \"/place/{orderId}\",method =
RequestMethod.GET)

public String placeOrder(@PathVariable String orderId)

{

//模擬service層處理

System.out.println(\"處理訂單數據\...\");

//將訂單id發送到訂單隊列

rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);

return \"下單成功,修改庫存\";

}

}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • > 隨著人工智慧技術的不斷發展,阿裡體育等IT大廠,推出的“樂動力”、“天天跳繩”AI運動APP,讓**雲上運動會、線上運動會、健身打卡、AI體育指導**等概念空前火熱。那麼,能否將這些在APP成功應用的場景搬上小程式,分享這些概念的紅利呢?本系列文章就帶您一步一步從零開始開發一個AI運動小程式,本 ...
  • EBT3002串口伺服器晶元簡介 EBT3002 乙太網晶元是實現8路TTL串口數據與乙太網數據互相轉換的乙太網串口伺服器晶元;該乙太網晶元具有多種Modbus網關模式以及TCP/DUP/MQTT/HTTP物聯網網關模式,可滿足各類串口IO聯網設備與PLC設備的聯網功能。乙太網晶元採用採用LQFP1 ...
  • 確保密碼的安全性是非常重要的,以下是幾種常見的方法來提高密碼的安全性: 1. 使用加密傳輸:在密碼提交到後端之前,確保使用安全的加密協議(如HTTPS)來加密數據傳輸,以防止密碼被中間人攻擊竊取。 2. 密碼哈希:在後端接收到密碼後,使用密碼哈希演算法(如SHA-256)對密碼進行哈希處理,將密碼轉換 ...
  • 某日二師兄參加XXX科技公司的C++工程師開發崗位第16面: > 面試官:什麼是左值,什麼是右值? > > 二師兄:簡單來說,左值就是可以使用`&`符號取地址的值,而右值一般不可以使用`&`符號取地址。 ```c++ int a = 42; //a是左值,可以&a int* p = &a; int* ...
  • ## 一、初衷: 因為想要進行各種技術點的訓練和學習,開發中需要使用各種各樣的開源技術框架,苦於沒有基礎服務支撐,所以想要建立一個專門的服務支撐系統,每年購買的雲伺服器配置底下,安裝一個Jenkins都跑不起來,所以自己購買了一個物理主機,記憶體加裝到`32G`,搭建自己的私人技術知識星球。 搭建一套 ...
  • - 方法和函數的作用幾乎是一樣的,但是函數在使用的過程中更加靈活和多樣化 - scala中函數是頭等公民 . 可以作為方法的返回值和參數使用 - scala是一個集面向對象和麵向函數於一身的編程語言 , 靈活的函數是函數式編程的一種體現 - 函數的使用使代碼更加簡潔和靈活 # 函數 scala中一種 ...
  • > 最近項目中使用到了 lua,因為之前沒怎麼接觸過,特此記錄下自己在學習過程中疑惑的地方。 在使用`lua` 進行編碼的過程中,我們經常會使用到`.`和`:`,但是對於剛開始接觸`lua`的我來說,對這兩者的使用還是感到時常感到疑惑,接下來我們一起看看幾個例子,來感受兩者的區別。如果發現文中錯誤的 ...
  • # Go Redis 管道和事務之 go-redis ## [Go Redis 管道和事務官方文檔介紹](https://redis.uptrace.dev/zh/guide/go-redis-pipelines.html) Redis pipelines(管道) 允許一次性發送多個命令來提高性能, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...