前面幾篇講解瞭如何使用rabbitMq,這一篇主要講解spring集成rabbitmq。 首先引入配置文件org.springframework.amqp,如下 一:配置消費者和生成者公共部分 二:配置生成者 三:生產者程式 其中convertAndSend方法預設第一個參數是交換機名稱,第二個參數 ...
前面幾篇講解瞭如何使用rabbitMq,這一篇主要講解spring集成rabbitmq。
首先引入配置文件org.springframework.amqp,如下
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> </dependency>
一:配置消費者和生成者公共部分
<rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}" channel-cache-size="50"/> <rabbit:admin connection-factory="connectionFactory"/> <!--定義消息隊列--> <rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/> <rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/> <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/> <!--綁定隊列--> <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange>
二:配置生成者
<import resource="amqp-share.xml"/> <!--創建消息隊列模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spittle.fanout" message-converter="jsonMessageConverter"> </rabbit:template> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
三:生產者程式
public class Spittle implements Serializable { private Long id; private Spitter spitter; private String message; private Date postedTime; public Spittle(Long id, Spitter spitter, String message, Date postedTime) { this.id = id; this.spitter = spitter; this.message = message; this.postedTime = postedTime; } public Long getId() { return this.id; } public String getMessage() { return this.message; } public Date getPostedTime() { return this.postedTime; } public Spitter getSpitter() { return this.spitter; } }
public class ProducerMain { public static void main(String[] args) throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml"); AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate"); for (int i = 0; i < 20; i++) { System.out.println("Sending message #" + i); Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date()); template.convertAndSend(spittle); Thread.sleep(5000); } System.out.println("Done!"); } }
其中convertAndSend方法預設第一個參數是交換機名稱,第二個參數是路由名稱,第三個才是我們發送的數據,現在我們啟動程式,效果如下
第四個:消費者程式
首先編寫一個用於監聽生產者發送信息的代碼
/** * Created by Administrator on 2016/11/18. */ public class SpittleAlertHandler implements MessageListener { @Override public void onMessage(Message message) { try { String body=new String(message.getBody(),"UTF-8"); System.out.println(body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
一定要註意實現MessageListener,我們只需要獲取message的body即可,通過json來轉換我們需要的程式(比如我們可以發送一個map,map存放方法和實體,這樣我們可以通過反射來調用不同的程式來運行)。
下麵我們配置消費者
<import resource="amqp-share.xml"/> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/> </rabbit:listener-container> <bean id="spittleListener" class="com.lp.summary.rabbitmq.impl.SpittleAlertHandler"/>
其中spittleListener是監聽的程式,method是執行的方法,queues是我們監聽的隊列,多個隊列可以逗號隔開(因為我們採用的是分發,所以三個隊列獲取的消息是相同的,這裡為了簡便我放在一個監聽程式中了,其實我們可以寫三個消費者,每個消費者監聽一個隊列)
現在只需要啟動程式即可運行
public class ConsumerMain { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-consumer.xml"); } }
當然direct跟上面的情況差不多,只不過這個是根據路由匹配,先把數據發送到交換機,然後綁定路由和隊列,通過交換機id和路由來找到隊列,下麵是一些主要的配置
<rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1"></rabbit:queue> <rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false"></rabbit:queue> <!--交換機定義--> <!--rabbit:direct-exchange:定義exchange模式為direct, 意思就是消息與一個特定的路由鍵完全匹配,才會轉發。 rabbit:binding:設置消息queue匹配的key--> <rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}"> <rabbit:bindings> <rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/> <rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/> </rabbit:bindings> </rabbit:direct-exchange> <!--spring template聲明--> <rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"></rabbit:template> <!--消息對象轉成成json--> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
下麵是消費者監聽配置
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter"></rabbit:listener> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter"></rabbit:listener> </rabbit:listener-container>
下麵是程式
public static void main(String[] args) { ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml"); MQProducer mqProducer=(MQProducer) context.getBean("mqProducer"); mqProducer.sendDateToQueue("spring.test.queueKey1","Hello World spring.test.queueKey1"); mqProducer.sendDateToQueue("spring.test.queueKey2","Hello World spring.test.queueKey2"); }
實際情況可能需要我們去分離消費者和生成者的程式。當然spring還有負載均衡的配置,這裡就不多介紹了。