§1 RabbitMQ延遲隊列 RabbitMQ延遲隊列,主要是藉助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)來實現。 涉及到2個隊列,一個用於發送消息,一個用於消息過期後的轉發目標隊列。 本例中, 定義2組exchange和queu ...
§1 RabbitMQ延遲隊列
RabbitMQ延遲隊列,主要是藉助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)來實現。
涉及到2個隊列,一個用於發送消息,一個用於消息過期後的轉發目標隊列。
本例中, 定義2組exchange和queue。
agentpayquery1exchange agentpayquery1queue(routingkey為delay) agentpayquery2exchange agentpayquery2queue(routingkey為delay)
agentpayquery1queue是緩衝隊列,消息過期路由到agentpayquery2queue
§2 生產者
生產者配置:
<!-- 連接服務配置 --> <rabbit:connection-factory id="connectionFactoryProducer" addresses="${mq.ip}" //192.168.40.40:5672 username="${username}" password="${password}" channel-cache-size="${cache.size}" publisher-confirms="${publisher.confirms}" publisher-returns="${publisher.returns}" virtual-host="/" /> <!--========================出款查詢 延遲隊列配置 begin =========================--> <rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue"/> <rabbit:direct-exchange name="agentpayquery2exchange" durable="true" auto-delete="false" id="agentpayquery2exchange"> <rabbit:bindings> <rabbit:binding queue="agentpayquery2queue" key="delay" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue id="agentpayquery1queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery1queue" > <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="agentpayquery2exchange"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange name="agentpayquery1exchange" durable="true" auto-delete="false" id="agentpayquery1exchange"> <rabbit:bindings> <rabbit:binding queue="agentpayquery1queue" key="delay" /> </rabbit:bindings> </rabbit:direct-exchange> <!--定義RabbitTemplate實例--> <rabbit:template id="agentpayQueryMsgTemplate" exchange="agentpayquery1exchange" routing-key="delay" queue="agentpayquery1queue" connection-factory="connectionFactoryProducer" message-converter="mqMessageConverter" mandatory="true" confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"/> <!--========================出款查詢 延遲隊列配置 end =========================-->
生產者消息入隊:
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class AgentpayQueryProducer { private static final Logger log = LogManager.getLogger(AgentpayQueryProducer.class.getSimpleName()); @Autowired private RabbitTemplate agentpayQueryMsgTemplate; public void sendDelay(String message, int delaySeconds) { String expiration = String.valueOf(delaySeconds * 1000); agentpayQueryMsgTemplate.convertAndSend((Object) message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(expiration); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); log.info("出款查詢數據入隊:{}", new String(message.getBody())); return message; } }); } }
§3消費者
消費端的配置無他:
<!-- 連接服務配置 channel-cache-size="25" --> <rabbit:connection-factory id="connectionFactory" addresses="${mq.ip}" username="${username}" password="${password}" /> <bean id="agentpayQueryConsumer" class="com.emaxcard.rpc.payment.service.impl.batchpay.AgentpayQueryConsumer" /> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue" /> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" max-concurrency="20" concurrency="10" prefetch="10"> <rabbit:listener ref="agentpayQueryConsumer" queues="agentpayquery2queue" /> </rabbit:listener-container>
消息消費:
import com.alibaba.fastjson.JSON; import com.emaxcard.enums.BatchPayStatus; import com.emaxcard.exceptions.ResponseException; import com.emaxcard.payment.vo.PaymentRecord; import com.emaxcard.rpc.payment.model.PaymentRecordModel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; public class AgentpayQueryConsumer implements MessageListener { private static final Logger log = LogManager.getLogger(); @Autowired QueryGatewayService queryGatewayService; @Autowired AgentpayQueryProducer agentpayQueryProducer; @Override public void onMessage(Message message) { String mqMsg = new String(message.getBody()); log.info("出款查詢數據出隊:{}", mqMsg); PaymentRecord paymentRecordModel; try { paymentRecordModel = JSON.parseObject(mqMsg, PaymentRecord.class); } catch (Exception ex) { log.info("消息格式不是PaymentRecordModel,結束。"); return; } try { BatchPayStatus payStatus = queryGatewayService.queryGateway(paymentRecordModel); // 非終態,繼續放入延遲隊列 if (BatchPayStatus.SUCCESS != payStatus && BatchPayStatus.FAILED != payStatus) { if (BatchPayStatus.NOTEXIST == payStatus) { log.info("查詢結果是{},不再處理", payStatus); } else { agentpayQueryProducer.sendDelay(mqMsg, 10); } } } catch (Exception ex) { if (ex instanceof ResponseException) { log.info("轉賬查詢{},paymentId{},處理錯誤:{}", paymentRecordModel.getTransNo(), paymentRecordModel.getPaymentId(), ex.getMessage()); } else { log.error("處理消息異常:", ex); } } } }
§4 使用延遲隊列要註意
1. 因為是隊列,所以即使一個消息比在同一隊列中的其他消息提前過期,提前過期的也不會優先進入死信隊列,它們還是按照入庫的順序讓消費者消費。如果第一進去的消息過期時間是1小時,那麼死信隊列的消費者也許等1小時才能收到第一個消息。
2. 當緩衝隊列里一旦出現未設置過期時間的消息,那麼就會造成整個隊列堵塞。消費端也無法消費到消息。通過日誌可以看到,列印出來的都是 BlockingQueueConsumer。
Get messages Ack Mode選擇“Ack message requeue false”,可以將消息消費掉