##### RabbitMQ安裝 ``` docker run -d --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbi ...
RabbitMQ安裝
docker run -d --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3.8.15-management
#網路安全組記得開放埠
4369 erlang 發現口
5672 client 端通信口
15672 管理界面 ui 埠
25672 server 間內部通信口
訪問管理界面
ip:15672
依賴引入
<!--引入AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.xml
spring:
##----------rabbit配置--------------
rabbitmq:
host: 192.168.75.146
port: 5672
virtual-host: dev
username: admin
password: password
listener:
simple:
#消息確認方式,manual(手動ack) 和auto(自動ack) 。消息隊列重試到達次數進入異常交換機--為實現,該策略要為auto
acknowledge-mode: auto
retry:
#開啟重試,消費者代碼不能添加try catch捕獲不往外拋異常
enabled: true
#最大重試次數
max-attempts: 4
# 重試消息的時間間隔,5秒
max-interval: 5000
RabbitMQ配置文件 (一個交換機,兩個隊列,routingKey匹配規則適用於兩隊列)
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交換機
*/
private String shortLinkEventExchange="short_link.event.exchange";
/**
* 創建交換機 Topic類型
* 一般一個微服務一個交換機
* @return
*/
@Bean
public Exchange shortLinkEventExchange(){
return new TopicExchange(shortLinkEventExchange,true,false);
//return new FanoutExchange(shortLinkEventExchange,true,false);
}
//新增短鏈相關配置====================================
/**
* 新增短鏈 隊列
*/
private String shortLinkAddLinkQueue="short_link.add.link.queue";
/**
* 新增短鏈映射 隊列
*/
private String shortLinkAddMappingQueue="short_link.add.mapping.queue";
/**
* 新增短鏈具體的routingKey,【發送消息使用】
*/
private String shortLinkAddRoutingKey="short_link.add.link.mapping.routing.key";
/**
* topic類型的binding key,用於綁定隊列和交換機,是用於 link 消費者
*/
private String shortLinkAddLinkBindingKey="short_link.add.link.*.routing.key";
/**
* topic類型的binding key,用於綁定隊列和交換機,是用於 mapping 消費者
*/
private String shortLinkAddMappingBindingKey="short_link.add.*.mapping.routing.key";
/**
* 新增短鏈api隊列和交換機的綁定關係建立
*/
@Bean
public Binding shortLinkAddApiBinding(){
return new Binding(shortLinkAddLinkQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddLinkBindingKey,null);
}
/**
* 新增短鏈mapping隊列和交換機的綁定關係建立
*/
@Bean
public Binding shortLinkAddMappingBinding(){
return new Binding(shortLinkAddMappingQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange,shortLinkAddMappingBindingKey,null);
}
/**
* 新增短鏈Link普通隊列,用於被監聽
*/
@Bean
public Queue shortLinkAddLinkQueue(){
return new Queue(shortLinkAddLinkQueue,true,false,false);
}
/**
* 新增短鏈mapping 普通隊列,用於被監聽
*/
@Bean
public Queue shortLinkAddMappingQueue(){
return new Queue(shortLinkAddMappingQueue,true,false,false);
}
對應的兩個消費者
@Component
@Slf4j
//@RabbitListener(queues = "short_link.add.link.queue")
@RabbitListener(queuesToDeclare = { @Queue("short_link.add.link.queue") })
public class ShortLinkAddLinkMQListener {
@Autowired
private ShortLinkService shortLinkService;
@RabbitHandler
public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
log.info("ShortLinkAddLinkMQListener message:{}",message);
try{
eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_LINK.name());
shortLinkService.handlerAddShortLink(eventMessage);
}catch (Exception e){
// 處理業務異常,等其他操作
log.error("消費失敗:{}",eventMessage);
throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
}
log.info("消費成功:{}",eventMessage);
//確認消息消費成功
// channel.basicAck(tag,false);
}
}
@Component
@Slf4j
//@RabbitListener(queues = "short_link.add.mapping.queue")
@RabbitListener(queuesToDeclare = { @Queue("short_link.add.mapping.queue") })
public class ShortLinkAddMappingMQListener {
@Autowired
private ShortLinkService shortLinkService;
@RabbitHandler
public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel) throws IOException {
log.info("ShortLinkAddMappingMQListener message:{}",message);
try{
eventMessage.setEventMessageType(EventMessageType.SHORT_LINK_ADD_MAPPING.name());
shortLinkService.handlerAddShortLink(eventMessage);
}catch (Exception e){
// 處理業務異常,等其他操作
log.error("消費失敗:{}",eventMessage);
throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
}
log.info("消費成功:{}",eventMessage);
//確認消息消費成功
// channel.basicAck(tag,false);
}
}
實戰,這裡直接在controller層中編碼。(應該在Service實現)
@RestController
@RequestMapping("/api/link/v1")
public class ShortLinkController {
@Autowired
private ShortLinkService shortLinkService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 新增短鏈
* @param shortLinkAddRequest
* @return
*/
@PostMapping("add")
public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest){
//參數:交換機、匹配規則Key、信息對象
EventMessage eventMessage = EventMessage.builder().[設置對象各欄位信息]
.build();
//生成MQ。
rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(), rabbitMQConfig.getShortLinkAddRoutingKey(), eventMessage);
return jsonData;
}
}