## springboot下使用rabbitMQ之傳參及序列化(二) 消息參數傳遞在開發中也是個坑,不論使用內置的`SimpleMessageConverter`還是`Jackson2JsonMessageConverter`均無法讓Consumer接收動態參數 ### 一.序列化的問題 首先貼出具 ...
springboot下使用rabbitMQ之傳參及序列化(二)
消息參數傳遞在開發中也是個坑,不論使用內置的SimpleMessageConverter
還是Jackson2JsonMessageConverter
均無法讓Consumer接收動態參數
一.序列化的問題
首先貼出具體代碼以及測試用例:
- 消費者
@RabbitListener(queues = "text.queue")
@RabbitHandler(isDefault = true)
public void exec(@Payload Map dto, Message message, Channel channel){
// 註意,發送的消息類型必須是實現了Serializable介面的類型,消費者介面類型不能隨便寫!
LOG.info(RabbitMQCfgEnum.TEXT +"接收到消息:{}", dto);
// 設置手動確認才會需要執行此
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),Boolean.TRUE);
}
- 生產者(測試用例)
private Connection buidConnection()throws Exception{
//1.創建連接工廠
ConnectionFactory factory = new ConnectionFactory();//MQ採用工廠模式來完成連接的創建
//2.在工廠對象中設置連接信息(ip,port,virtualhost,username,password)
factory.setHost("10.156.122.215");//設置MQ安裝的伺服器ip地址
factory.setPort(5672);//設置埠號
factory.setVirtualHost("vhost");//設置虛擬主機名稱
//MQ通過用戶來管理
factory.setUsername("shadow");//設置用戶名稱
factory.setPassword("shadow");//設置用戶密碼
//3.通過工廠對象獲取連接
Connection connection = factory.newConnection();
return connection;
}
@Test
public void test02()throws Exception{
Connection connection = this.buidConnection();
Channel channel = connection.createChannel();
Map<String,Object> data = new HashMap<>(4);
data.put("ordeNo", SeqGenUtil.genSeq());
data.put("timestamp",System.currentTimeMillis());
String json = JacksonUtil.toJsonString(data);
channel.basicPublish(RabbitMQCfgEnum.TEXT.exchange, RabbitMQCfgEnum.TEXT.routingKey, null,json.getBytes(StandardCharsets.UTF_8));
//關閉連接
channel.close();
connection.close();
}
執行測試用例,它居然拋錯了:
2023-07-17 14:10:24.037 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN o.s.a.rabbit.retry.RejectAndDontRequeueRecoverer:74 - Retries exhausted for message (Body:'[B@3ad45d58(byte[55])' MessageProperties [headers={}, contentLength=0, redelivered=false, receivedExchange=text_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, consumerQueue=text.queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(java.util.Map,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@7839ec46]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.util.Map] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=c28511a8-1bbe-9d57-879c-c5763ba40129, amqp_consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, amqp_lastInBatch=false, timestamp=1689574224032}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
... 27 common frames omitted
2023-07-17 14:10:24.038 -> [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] -> WARN o.s.a.r.listener.ConditionalRejectingErrorHandler:170 - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76)
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean.recover(StatelessRetryOperationsInterceptorFactoryBean.java:78)
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:157)
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:122)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:220)
at org.springframework.amqp.rabbit.listener.$Proxy107.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: null
... 19 common frames omitted
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mee.api.common.service.impl.MQConsumerTextHandler.exec(java.util.Map,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)]
Bean [com.mee.api.common.service.impl.MQConsumerTextHandler@7839ec46]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:97)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
... 14 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [java.util.Map] for GenericMessage [payload=byte[55], headers={amqp_receivedExchange=text_exchange, amqp_deliveryTag=1, amqp_consumerQueue=text.queue, amqp_redelivered=false, id=c28511a8-1bbe-9d57-879c-c5763ba40129, amqp_consumerTag=amq.ctag-IenKeUIVEFy1vBz1jQenVw, amqp_lastInBatch=false, timestamp=1689574224032}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$OptionalEmptyAwarePayloadArgumentResolver.resolveArgument(RabbitListenerAnnotationBeanPostProcessor.java:1053)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:77)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:263)
... 27 common frames omitted
看吧json字元串是無法序列化為Map