消息隊列在現今數據量超大,併發量超高的系統中是十分常用的。本文將會對現時最常用到的幾款消息隊列框架 ActiveMQ、RabbitMQ、Kafka 進行分析對比。 詳細介紹 RabbitMQ 在 Sprinig 框架下的結構及實現原理,從Producer 端的事務、回調函數(ConfirmCallb... ...
前言
消息隊列在現今數據量超大,併發量超高的系統中是十分常用的。本文將會對現時最常用到的幾款消息隊列框架 ActiveMQ、RabbitMQ、Kafka 進行分析對比。
詳細介紹 RabbitMQ 在 Spring 框架下的結構及實現原理,從Producer 端的事務、回調函數(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器進行詳細的分析。通過對 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用類型介紹,深入剖析在消息處理各個傳輸環節中的原理及註意事項。
並舉以實例對死信隊列、持久化操作進行一一介紹。
目錄
一、RabbitMQ 與 AMQP 的關係
1.1 AMQP簡介
AMQP(Advanced Message Queue Protocol 高級消息隊列協議)是一個消息隊列協議,它支持符合條件的客戶端和消息代理中間件(message middleware broker)進行通訊。RabbitMQ 則是 AMQP 協議的實現者,主要用於在分散式系統中信息的存儲發送與接收,RabbitMQ 的伺服器端用 Erlang 語言編寫,客戶端支持多種開發語言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。
1.2 ActiveMQ、RabbitMQ、Kafka 對比
現在在市場上有 ActiveMQ、RabbitMQ、Kafka 等多個常用的消息隊列框架,與其他框架對比起來,RabbitMQ 在易用性、擴展性、高可用性、多協議、支持多語言客戶端等方面都有不俗表現。
1.2.1 AcitveMQ 特點
ActiveMQ 是 Apache 以 Java 語言開發的消息模型,它完美地支持 JMS(Java Message Service)消息服務,客戶端支持 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多種開主發語言,支持OpenWire、Stomp、REST、XMPP、AMQP 等多種協議。ActiveMQ 採用非同步消息傳遞方式,在設計上保證了多主機集群,客戶端-伺服器,點對點等模式的有效通信。從開始它就是按照 JMS 1.1 和 J2EE 1.4 規範進行開發,實現了消息持久化,XA,事務支撐等功能。經歷多年的升級完善,現今已成為 Java 應用開發中主流的消息解決方案。但相比起 RabbitMQ、Kafka 它的主要缺點表現為資源消耗比較大,吞吐量較低,在高併發的情況下系統支撐能力較弱。如果系統全程使用 Java 開發,其併發量在可控範圍內,或系統需要支持多種不同的協議,使用 ActiveMQ 可更輕便地搭建起消息隊列服務。
1.2.2 Kafka 特點
Kafka 天生是面向分散式系統開發的消息隊列,它具有高性能、容災性、可動態擴容等特點。Kafka 與生俱來的特點在於它會把每個Partition 的數據都備份到不同的伺服器當中,並與 ZooKeeper 配合,當某個Broker 故障失效時,ZooKeeper 服務就會將通知生產者和消費者,從備份伺服器進行數據恢復。在性能上 Kafka 也大大超越了傳統的 ActiveMQ、RabbitMQ ,由於 Kafka 集群可支持動態擴容,在負載量到達峰值時可動態增加新的伺服器進集群而無需重啟服務。但由於 Kafka 屬於分散式系統,所以它只能在同一分區內實現消息有序,無法實現全局消息有序。而且它內部的監控機制不夠完善,需要安裝插件,依賴ZooKeeper 進行元數據管理。如果系統屬於分散式管理機制,數據量較大且併發量難以預估的情況下,建議使用 Kafka 隊列。
1.2.3 RabbitMQ 對比
由於 ActiveMQ 過於依賴 JMS 的規範而限制了它的發展,所以 RabbitMQ 在性能和吞吐量上明顯會優於 ActiveMQ。
由於上市時間較長,在可用性、穩定性、可靠性上 RabbitMq 會比 Kafka 技術成熟,而且 RabbitMq 使用 Erlang 開發,所以天生具備高併發高可用的特點。而 Kafka 屬於分散式系統,它的性能、吞吐量、TPS 都會比 RabbitMq 要強。
二、RabbitMQ 的實現原理
2.1 生產者(Producer)、消費者(Consumer)、服務中心(Broker)之間的關係
首先簡單介紹 RabbitMQ 的運行原理,在 RabbitMQ 使用時,系統會先安裝並啟動 Broker Server,也就是 RabbitMQ 的服務中心。無論是生產者 (Producer),消費者(Consumer)都會通過連接池(Connection)使用 TCP/IP 協議(預設)來與 BrokerServer 進行連接。然後 Producer 會把 Exchange / Queue 的綁定信息發送到 Broker Server,Broker Server 根據 Exchange 的類型邏輯選擇對應 Queue ,最後把信息發送到與 Queue 關聯的對應 Consumer 。
2.2 交換器(Exchange)、隊列(Queue)、通道(Channel)、綁定(Binding)的概念
2.2.1 交換器 Exchange
Producer 建立連接後,並非直接將消息投遞到隊列 Queue 中,而是把消息發送到交換器 Exchange,由 Exchange 根據不同邏輯把消息發送到一個或多個對應的隊列當中。目前 Exchange 提供了四種不同的常用類型:Fanout、Direct、Topic、Header。
- Fanout類型
此類型是最為常見的交換器,它會將消息轉發給所有與之綁定的隊列上。比如,有N個隊列與 Fanout 交換器綁定,當產生一條消息時,Exchange 會將該消息的N個副本分別發給每個隊列,類似於廣播機制。
- Direct類型
此類型的 Exchange 會把消息發送到 Routing_Key 完全相等的隊列當中。多個 Cousumer 可以使用相同的關鍵字進行綁定,類似於資料庫的一對多關係。比如,Producer 以 Direct 類型的 Exchange 推送 Routing_Key 為 direct.key1 的隊列,系統再指定多個 Cousumer 綁定 direct.key1。如此,消息就會被分發至多個不同的 Cousumer 當中。
- Topic類型
此類型是最靈活的一種方式配置方式,它可以使用模糊匹配,根據 Routing_Key 綁定到包含該關鍵字的不同隊列中。比如,Producer 使用 Topic類型的 Exchange 分別推送 Routing_Key 設置為 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不同隊列,Cousumer 只需要把 Routing_Key 設置為 topic.guangdong.# ,就可以把所有消息接收處理。
- Headers類型
該類型的交換器與前面介紹的稍有不同,它不再是基於關鍵字 Routing_Key 進行路由,而是基於多個屬性進行路由的,這些屬性比路由關鍵字更容易表示為消息的頭。也就是說,用於路由的屬性是取自於消息 Header 屬性,當消息 Header 的值與隊列綁定時指定的值相同時,消息就會路由至相應的隊列中。
2.2.2 Queue 隊列
Queue 隊列是消息的載體,每個消息都會被投入到 Queue 當中,它包含 name,durable,arguments 等多個屬性,name 用於定義它的名稱,當 durable(持久化)為 true 時,隊列將會持久化保存到硬碟上。反之為 false 時,一旦 Broker Server 被重啟,對應的隊列就會消失,後面還會有例子作詳細介紹。
2.2.3 Channel 通道
當 Broker Server 使用 Connection 連接 Producer / Cousumer 時會使用到通道(Channel),一個 Connection上可以建立多個 Channel,每個 Channel 都有一個會話任務,可以理解為邏輯上的連接。主要用作管理相關的參數定義,發送消息,獲取消息,事務處理等。
2.2.4 Binding 綁定
Binding 主要用於綁定交換器 Exchange 與 隊列 Queue 之間的對應關係,並記錄路由的 Routing-Key。Binding 信息會保存到系統當中,用於 Broker Server 信息的分發依據。
三、RabbitMQ 應用實例
3.1 Rabbit 常用類說明
3.1.1 RabbitTemplate 類
Spring 框架已經封裝了 RabbitTemplate 對 RabbitMQ 的綁定、隊列發送、接收進行簡化管理
方法 | 說明 |
void setExchange(String exchange) | 設置綁定的 exchange 名稱 |
String getExchange() | 獲取已綁定的 exchange 名稱 |
void setRoutingKey(String routingKey) | 設置綁定的 routingKey |
String getRoutingKey() | 獲取已綁定的 routingKey |
void send(String exchange, String routingKey, Message message,CorrelationData data) | 以Message方式發送信息到 Broken Server,CorrelationData 為標示符可為空 |
void convertAndSend(String exchange, String routingKey, Object object, CorrelationData data) | 以自定義對象方式發送信息到 Broken Server,系統將自動把 object轉換成 Message,CorrelationData 為標示符可為空 |
Message receive(String queueName, long timeoutMillis) | 根據queueuName接收隊列發送Message信息 |
Object receiveAndConvert(String queueName, long timeoutMillis) | 根據queueuName接收隊列對象信息 |
void setReceiveTimeout(long receiveTimeout) | 設置接收過期時間 |
void setReplyTimeout(long replyTimeout) | 設置重發時間 |
void setMandatory(boolean mandatory) | 開啟強制委托模式(下文會詳細說明) |
void setConfirmCallback(confirmCallback) | 綁定消息確認回調方法(下文會詳細說明) |
void setReturnCallback(returnCallback) | 綁定消息退出回調方法(下文會詳細說明) |
3.2 初探 RabbitMQ
在官網下載併成功安裝完 RabbitMQ 後,打開預設路徑 http://localhost:15672/#/ 即可看到 RabbitMQ 服務中心的管理界面
3.2.1 Producer 端開發
先在 pom 中添加 RabbitMQ 的依賴,併在 application.yml 中加入 RabbitMQ 帳號密碼等信息。此例子,我們嘗試使用 Direct 交換器把隊列發送到不同的 Consumer。
1 **********************pom ************************* 2 <project> 3 ............. 4 <dependency> 5 <groupId>org.springframework.boot</groupId> 6 <artifactId>spring-boot-starter-amqp</artifactId> 7 <version>2.0.5.RELEASE</version> 8 </dependency> 9 </project> 10 11 **************** application.yml **************** 12 spring: 13 application: 14 name: rabbitMqProducer 15 rabbitmq: 16 host: localhost 17 port: 5672 18 username: admin 19 password: 12345678 20 virtual-host: /LeslieHost
首先使用 CachingConnectionFactory 建立鏈接,通過 BindingBuilder 綁定 Exchange、Queue、RoutingKey之間的關係。
然後通過 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把信息發送到 Broken Server
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String second="direct.second"; 34 public final static String Exchange_NAME="directExchange"; 35 public final static String RoutingKey1="directKey1"; 36 public final static String RoutingKey2="directKey2"; 37 38 @Bean 39 public Queue queueFirst(){ 40 return new Queue(first); 41 } 42 43 @Bean 44 public Queue queueSecond(){ 45 return new Queue(second); 46 } 47 48 @Bean 49 public DirectExchange directExchange(){ 50 return new DirectExchange(Exchange_NAME,true,true); 51 } 52 53 //利用BindingBuilder綁定Direct與queueFirst 54 @Bean 55 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 56 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57 } 58 59 //利用BindingBuilder綁定Direct與queueSecond 60 @Bean 61 public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){ 62 return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63 } 64 } 65 66 @Controller 67 @RequestMapping("/producer") 68 public class ProducerController { 69 @Autowired 70 private RabbitTemplate template; 71 72 @RequestMapping("/send") 73 public void send() { 74 for(int n=0;n<100;n++){ 75 76 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData()); 77 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData()); 78 } 79 } 80 81 private CorrelationData getCorrelationData(){ 82 return new CorrelationData(UUID.randomUUID().toString()); 83 } 84 }
此時,打開 RabbitMQ 管理界面,可看到 Producer 已經向 Broken Server 的 direct.first / direct.second 兩個 Queue 分別發送100 個 Message
3.2.2 Consumer 端開發
分別建立兩個不同的 Consumer ,一個綁定 direct.first 別一個綁定 direct.second , 然後通過註解 @RabbitListener 監聽不同的 queue,當接到到 Producer 推送隊列時,顯示隊列信息。
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 public class BindingConfig { 32 public final static String first="direct.first"; 33 public final static String Exchange_NAME="directExchange"; 34 public final static String RoutingKey1="directKey1"; 35 36 @Bean 37 public Queue queueFirst(){ 38 return new Queue(first); 39 } 40 41 @Bean 42 public DirectExchange directExchange(){ 43 return new DirectExchange(Exchange_NAME); 44 } 45 46 //利用BindingBuilder綁定Direct與queueFirst 47 @Bean 48 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 49 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 50 } 51 } 52 53 @Configuration 54 @RabbitListener(queues="direct.first") 55 public class RabbitMqListener { 56 57 @RabbitHandler 58 public void handler(String message){ 59 System.out.println(message); 60 } 61 } 62 63 @SpringBootApplication 64 public class App { 65 66 public static void main(String[] args){ 67 SpringApplication.run(App.class, args); 68 } 69 }
運行後可以觀察到不同的 Consumer 會收到不同隊列的消息
如果覺得使用 Binding 代碼綁定過於繁瑣,還可以直接在監聽類RabbitMqListener中使用 @QueueBinding 註解綁定
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 factory.setHost(host); 22 factory.setPort(port); 23 factory.setUsername(username); 24 factory.setPassword(password); 25 factory.setVirtualHost(virtualHost); 26 return factory; 27 } 28 } 29 30 @Configuration 31 @RabbitListener(bindings=@QueueBinding( 32 exchange=@Exchange(value="directExchange"), 33 value=@Queue(value="direct.second"), 34 key="directKey2")) 35 public class RabbitMqListener { 36 37 @RabbitHandler 38 public void handler(String message){ 39 System.out.println(message); 40 } 41 } 42 43 @SpringBootApplication 44 public class App { 45 46 public static void main(String[] args){ 47 SpringApplication.run(App.class, args); 48 } 49 }
運行結果
四、Producer 端的消息發送與監控
前面一節已經介紹了RabbitMQ的基本使用方法,這一節將從更深入的層面講述 Producer 的應用。
試想一下這種的情形,如果因 RabbitTemplate 發送時 Exchange 名稱綁定錯誤,或 Broken Server 因網路問題或服務負荷過大引發異常,Producer 發送的隊列丟失,系統無法正常工作。此時,開發人員應該進行一系列應對措施進行監測,確保每個數據都能正常推送到 Broken Server 。有見及此,RabbitMQ 專門為大家提供了兩種解決方案,一是使用傳統的事務模式,二是使用回調函數,下麵為大家作詳介紹。
4.1 Producer 端的事務管理
在需要使用事務時,可以通過兩種方法
第一可以調用 channel 類的方法以傳統模式進行管理,事務開始時調用 channel.txSelect(),信息發送後進行確認 channel.txCommit(),一旦捕捉到異常進行回滾 channel.txRollback(),最後關閉事務。
1 @Controller 2 @RequestMapping("/producer") 3 public class ProducerController { 4 @Autowired 5 private RabbitTemplate template; 6 7 @RequestMapping("/send") 8 public void send1(HttpServletResponse response) 9 throws InterruptedException, IOException, TimeoutException{ 10 Channel channel=template.getConnectionFactory().createConnection().createChannel(true); 11 ....... 12 try{ 13 channel.txSelect(); 14 channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes()); 15 channel.txCommit(); 16 }catch(Exception e){ 17 channel.txRollback(); 18 }finally{ 19 channel.close(); 20 } 21 ...... 22 ...... 23 ...... 24 } 25 }
第二還可以直接通過 RabbitTemplate 的配置方法 void setChannelTransacted(bool isTransacted) 直接開啟事務
1 public class ProducerController { 2 @Autowired 3 private ConnectionConfig connection; 4 5 @Autowired 6 @Bean 7 private RabbitTemplate template(){ 8 RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory()); 9 template.setChannelTransacted(true); 10 return template; 11 } 12 13 @RequestMapping("/send") 14 @Transactional(rollbackFor=Exception.class) 15 public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{ 16 .......... 17 .......... 18 .......... 19 } 20 }
4.2 利用 ConfirmCallback 回調確認消息是否成功發送到 Exchange
使用事務模式消耗的系統資源比較大,系統往往會處理長期等待的狀態,在併發量較高的時候也有可能造成死鎖的隱患。有見及此,系統提供了輕量級的回調函數方式進行非同步處理。
當需要確認消息是否成功發送到 Exchange 的時候,可以使用 ConfirmCallback 回調函數。使用該函數,系統推送消息後,該線程便會得到釋放,等 Exchange 接收到消息後系統便會非同步調用 ConfirmCallback 綁定的方法進行處理。ConfirmCallback 只包含一個方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法會把每條數據發送到 Exchange 時候的 ack 狀態(成功/失敗),cause 成敗原因,及對應的 correlationData(CorrelationData 只包含一個屬性 id,是綁定發送對象的唯一標識符) 返還到 Producer,讓Producer 進行相應處理。
註意:在綁定 ConfirmCallback 回調函數前,請先把 publisher-confirms 屬性設置為 true
1 spring: 2 application: 3 name: rabbitmqproducer 4 rabbitmq: 5 host: 127.0.0.1 6 port: 5672 7 username: admin 8 password: 12345678 9 virtual-host: /LeslieHost
例如:下麵的例子,特意將 RabbitTemplate 發送時所綁定的 Exchange 名稱填寫為錯誤名稱 “ ErrorExchange ”,造成發送失敗,然後在回調函數中檢查失敗的原因。
Producer 端代碼:
1 @Configuration 2 public class ConnectionConfig { 3 @Value("${spring.rabbitmq.host}") 4 public String host; 5 6 @Value("${spring.rabbitmq.port}") 7 public int port; 8 9 @Value("${spring.rabbitmq.username}") 10 public String username; 11 12 @Value("${spring.rabbitmq.password}") 13 public String password; 14 15 @Value("${spring.rabbitmq.virtual-host}") 16 public String virtualHost; 17 18 @Bean 19 public ConnectionFactory getConnectionFactory(){ 20 CachingConnectionFactory factory=new CachingConnectionFactory(); 21 System.out.println(host); 22 factory.setHost(host); 23 factory.setPort(port); 24 factory.setUsername(username); 25 factory.setPassword(password); 26 factory.setVirtualHost(virtualHost); 27 factory.setPublisherConfirms(true); 28 factory.setPublisherReturns(true); 29 return factory; 30 } 31 } 32 33 @Configuration 34 public class BindingConfig { 35 public final static String first="direct.first"; 36 public final static String Exchange_NAME="directExchange"; 37 public final static String RoutingKey1="directKey1"; 38 39 @Bean 40 public Queue queueFirst(){ 41 return new Queue(first); 42 } 43 44 @Bean 45 public DirectExchange directExchange(){ 46 return new DirectExchange(Exchange_NAME); 47 } 48 49 @Bean 50 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){ 51 return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 52 } 53 } 54 55 @Component 56 public class MyConfirmCallback implements ConfirmCallback { 57 58 @Override 59 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 60 // TODO 自動生成的方法存根 61 // TODO 自動生成的方法存根 62 if(ack){ 63 System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause); 64 }else 65 System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause); 66 } 67 } 68 69 @Controller 70 @RequestMapping("/producer") 71 public class ProducerController { 72 @Autowired 73 private RabbitTemplate template; 74 @Autowired 75 private MyConfirmCallback confirmCallback; 76 77 @RequestMapping("/send") 78 public void send() { 79 template.setConfirmCallback(confirmCallback); 80 for(int n=0;n<2;n++){ 81 template.convertAndSend("ErrorExchange", 82 BindingConfig.RoutingKey1,"I'm the first queue! " 83 +String.valueOf(n),getCorrelationData()); 84 } 85 } 86 87 private CorrelationData getCorrelationData(){ 88 return new CorrelationData(UU