1、非同步消息 當一個消息發送時候,消息會被交給消息代理,消息代理可以確保消息被髮送到指定的目的地,同時解放發送者,使其能夠繼續進行其它業務。消息代理通常有ActiveMQ、RabbitMQ...,目的地通常有隊列和主題,隊列採用點對點的模型,主題採用發佈訂閱模型 點對點模型:消息隊列可以有多個接受者 ...
1、非同步消息
當一個消息發送時候,消息會被交給消息代理,消息代理可以確保消息被髮送到指定的目的地,同時解放發送者,使其能夠繼續進行其它業務。消息代理通常有ActiveMQ、RabbitMQ...,目的地通常有隊列和主題,隊列採用點對點的模型,主題採用發佈訂閱模型
- 點對點模型:消息隊列可以有多個接受者,但每條消息只能被一個接收者取走
- 發佈訂閱模型:消息隊列可以有多個訂閱者,每條消息可以發送給多個主題訂閱者
2、JMS發送/接收消息
1)activemq配置,使用ActiveMQ,並使用了JMSTemplate。
JMS模板為開發者提供了與消息代理進行交互發送和接收消息的標準API,幾乎每個消息代理都支持JMS。Jms模板和spring Date提供的jdbc模板一樣可以消除樣板代碼,讓開發更集中在業務處理上。
<!-- ActiveMQ連接工廠 --> <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"></property> </bean> <!-- 消息隊列 --> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 消息主題 --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="usertopic"></constructor-arg> </bean> <!-- 定義模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> </bean>
2)發送消息
package com.cn.activemq; import com.cn.pojo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @Component public class SendMessageMQUtil { @Autowired private JmsTemplate jmsTemplate; @Autowired @Qualifier("queue") private Destination queue; @Autowired @Qualifier("topic") private Destination topic; /** * 隊列--發送消息 * @param user */ public void sendUserQueue(final User user){ jmsTemplate.send(queue, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); } /** * 主題--發送消息 * @param user */ public void sendUserTopic(final User user){ jmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); } }
3)接收消息
package com.cn.activemq; import com.cn.pojo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.ObjectMessage; @Component public class ReceiveMessageMQUtil { @Autowired private JmsTemplate jmsTemplate; @Autowired @Qualifier("queue") private Destination queue; @Autowired @Qualifier("topic") private Destination topic; /** * 隊列--接受消息 * @return */ public User receiveUserQueue(){ try { ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(queue); return (User)objectMessage.getObject(); } catch (JMSException e) { e.printStackTrace(); } return null; } /** * 主題--接受消息 * @return */ public User receiveUserTopic(){ try { ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(topic); return (User)objectMessage.getObject(); } catch (JMSException e) { e.printStackTrace(); } return null; } }
4)測試
分別新建測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"})//不可使用classpath:spring-*.xml,否則配置文件不起作用 public class SendMessageMQUtilTest { @Autowired private SendMessageMQUtil sendMessageMQUtil; @Test public void sendUserQueue() throws Exception { User user=new User("computer1", "111111"); sendMessageMQUtil.sendUserQueue(user); } @Test public void sendUserTopic() throws Exception { User user=new User("computer2", "222222"); sendMessageMQUtil.sendUserTopic(user); } }
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"}) public class ReceiveMessageMQUtilTest { @Autowired private ReceiveMessageMQUtil receiveMessageMQUtil; @Test public void receiveUserQueue() throws Exception { User user=receiveMessageMQUtil.receiveUserQueue(); System.out.println("ActiveMQ接收到的數據:"+user); } @Test public void receiveUserTopic() throws Exception { User user=receiveMessageMQUtil.receiveUserTopic(); System.out.println("ActiveMQ接收到的數據:"+user); } @Test public void receiveUserTopic2() throws Exception { User user=receiveMessageMQUtil.receiveUserTopic(); System.out.println("ActiveMQ接收到的數據:"+user); } }
分別運行測試方法,結合activeMQ的控制台可以看出隊列、主題以及各自的消費者等情況
顯示隊列:
顯示主題:
3、其它
1)設置預設的目的地
上述在發送消息和接收消息時,每次調用發送/接收消息的方法都傳入了一個目的地參數。然而,可以在JmsTemplate實例化的時候,指定預設的目的地,如下:
<!-- 定義隊列 --> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 定義模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> <property name="defaultDestination" ref="queue"></property>//此處註入隊列,也可以註入主題 </bean>
採用指定預設目的地的方式,則發送/接收消息調用的方法不用傳遞目的地了
/** * 隊列--發送消息 * @param user */ public void sendUserQueue(final User user){ jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); /** * 隊列--接受消息 * @return */ public User receiveUserQueue(){ try { ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(); return (User)objectMessage.getObject(); } catch (JMSException e) { e.printStackTrace(); } return null; } }
2)發送消息對消息進行轉換
除了send(..)方法,JmsTemplate還提供了convertAndSend()方法,該方法不需要MessageCreator參數,而使用內置的消息轉換器創建消息併發送。在JmsTemplate實例化時未指定消息轉換器,在調用convertAndSend()方法則使用預設的SimpleMessageConverter消息轉換器;receiveAndConvert()方法則在接收時候使用消息轉換器
/** * 隊列--發送消息 * @param user */ public void sendUserQueue(final User user){ jmsTemplate.convertAndSend(user); } /** * 隊列--接受消息 * @return */ public User receiveUserQueue(){ return (User)jmsTemplate.receiveAndConvert(); }
在JmsTemplate實例化指定消息轉換器,則會在使用convertAndSend()/receiveAndConvert()方法使用指定的消息轉換器
<!-- 定義隊列 --> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 消息轉換器 --> <bean id="mappingJackson2MessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter"></bean> <!-- 定義模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> <property name="defaultDestination" ref="queue"></property> <property name="messageConverter" ref="mappingJackson2MessageConverter"></property> </bean>
綜合1)2),使用convertAndSend()/receiveAndConvert()發送和接收消息更加簡單;在某些情況下,統一配置目的地也簡化的使用