工程結構: 定義jar包依賴的版本,版本很重要,rabbit依賴spring,必須一致,否則報錯: dependencies: spring-applicationContext: mq-applicationContext-producer.xml: mq-applicationContext-c ...
工程結構:
定義jar包依賴的版本,版本很重要,rabbit依賴spring,必須一致,否則報錯:
<properties> <springframework.version>4.2.7.RELEASE</springframework.version> <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version> <junit.version>4.12</junit.version> </properties>
dependencies:
<dependencies> <!-- LOGGING begin --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <!-- 代碼直接調用common-logging會被橋接到slf4j --> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.5</version> </dependency> <!-- LOGGING end --> <!--springframework--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.version}</version> </dependency> <!-- rabbitmq spring依賴 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring-rabbit.version}</version> </dependency> <!--common utils--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <!--test begin--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <!--<scope>test</scope>--> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${springframework.version}</version> <!--<scope>test</scope>--> </dependency> <!--test end--> </dependencies>
spring-applicationContext:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="fileEncoding" value="UTF-8"></property> <property name="locations"> <list> <value>classpath:applicationContext.properties</value> </list> </property> </bean> <context:annotation-config/> <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/> <!-- 配置掃描路徑 --> <context:component-scan base-package="demo"></context:component-scan> <!--rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" username="${paycenter.mq.user.username}" password="${paycenter.mq.user.password}" addresses="${paycenter.mq.user.host}"></rabbit:connection-factory> <import resource="classpath:mq-applicationContext-producer.xml"/> <import resource="classpath:mq-applicationContext-consumer.xml"/> </beans>
mq-applicationContext-producer.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <!--通過指定下麵的admin信息,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring amqp預設的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列 --> <bean id="mqMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"> </bean> <!--<bean id="publisherConfirmsReturns" class="com.emaxcard.mq.rabbit.PublisherConfirmsReturns"></bean>--> <!--========================延遲隊列配置 begin =========================--> <rabbit:queue id="agentpayqueryQueue2" durable="true" auto-delete="true" exclusive="false" name="agentpayqueryQueue2"/> <rabbit:direct-exchange id="agentpayqueryExchange2" durable="true" auto-delete="true" name="agentpayqueryExchange2"> <rabbit:bindings> <rabbit:binding queue="agentpayqueryQueue2" key="delay"/> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue id="agentpayqueryQueue1" durable="true" auto-delete="true" exclusive="false" name="agentpayqueryQueue1"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="agentpayqueryExchange2"/> <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange id="agentpayqueryExchange1" durable="true" auto-delete="true" name="agentpayqueryExchange1"> <rabbit:bindings> <rabbit:binding queue="agentpayqueryQueue1" key="delay"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定義RabbitTemplate實例--> <!--confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"--> <rabbit:template id="agentpayQueryMsgTemplate" exchange="agentpayqueryExchange1" routing-key="delay" connection-factory="connectionFactory" message-converter="mqMessageConverter" mandatory="true" /> <!--========================延遲隊列配置 end =========================--> </beans>
mq-applicationContext-consumer.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <bean id="agentpayQueryConsumer" class="demo.TestMQConsumer" /> <!-- TODO 後續刪除 receive-timeout:等待接收超時時長 影響連接創建和銷毀 concurrency:消費者個數 max-concurrency:最大消費者個數 min-start-interval:陸續啟動 減少併發環境(或是三方系統突然的網路延遲) 大量連接導致的性能耗損 min-stop-interval:陸續銷毀 減少突然的安靜 導致大量可用連接被銷毀 min-consecutive-active: 連續N次沒有接收發生超時 則認定為需要創建 消費者 min-consecutive-idle: 連續N次發生了接收超時 則認定消費者需要銷毀 prefetch:每個消費者預讀條數 因為非同步調用三方 性能瓶頸在網路與三方系統所以預讀取條數設置為1(預設為5) 只有一條消息被ACK才會接收下一條消息 transaction-size:會影響prefetch的數量 --> <!-- 監聽器 --> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" max-concurrency="20" concurrency="5" prefetch="10"> <rabbit:listener ref="agentpayQueryConsumer" queue-names="agentpayqueryQueue2" /> </rabbit:listener-container> </beans>
Producer類:
package demo; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext.xml") public class TestMQProducer { private static Logger logger = LoggerFactory.getLogger(TestMQProducer.class.getSimpleName()); @Autowired private RabbitTemplate agentpayQueryMsgTemplate; @Test public void test() throws Exception { for (int i = 0; i <= 100; i++) { Object data = String.valueOf(i); agentpayQueryMsgTemplate.convertAndSend(data); logger.info("入隊:{}", data); } Thread.sleep(12000); } }
Consumer類:
package demo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class TestMQConsumer implements MessageListener { private static Logger logger = LoggerFactory.getLogger(TestMQConsumer.class.getSimpleName()); public void onMessage(Message message) { String data = new String(message.getBody()); try { //模擬處理慢 Thread.sleep(1); logger.info("出隊:{}", data); } catch (InterruptedException e) { e.printStackTrace(); } } }
至此代碼就完畢了。
說明:上面定義隊列時我把auto-delete屬性設置為true, 所以,當消費者消費完並關閉連接後,隊列會自動刪除。exchange也如是。(通過mq控制台看,慄子中的agentpayqueryQueue2和agentpayqueryExchange2在執行完就自動消失了,agentpayqueryQueue1和agentpayqueryExchange1還存在。)
spring-rabbit-x.xml里對queue和exchange的auto-delete屬性的解釋:
Flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:queue)
Flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:exchange)
消費端的concurrency說明:
同樣,看spring-rabbit-x.xml的解釋:
The number of concurrent consumers to start for each listener initially.
See also 'max-concurrency'.
上面我設置的值是5,從mq控制台里看queue的consumer見下圖:
從出隊日誌,可以看出來,共有5個線程在消費這些消息。