RabbitMQ是一個消息中間件,在一些需要非同步處理、發佈/訂閱等場景的時候,使用RabbitMQ可以完成我們的需求。 下麵是我在學習java語言實現RabbitMQ(自RabbitMQ官網的Tutorials)的一些記錄。 首先有三個名稱瞭解一下(以下圖片來自rabbitMQ官網) produce ...
RabbitMQ是一個消息中間件,在一些需要非同步處理、發佈/訂閱等場景的時候,使用RabbitMQ可以完成我們的需求。 下麵是我在學習java語言實現RabbitMQ(自RabbitMQ官網的Tutorials)的一些記錄。
首先有三個名稱瞭解一下(以下圖片來自rabbitMQ官網)
producer
是用戶應用負責發送消息
queue
是存儲消息的緩衝(buffer)
consumer
是用戶應用負責接收消息
下麵是我使用rabbitMQ原生的jar包做的測試方法
maven pom.xml 加入
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
方法實現示意圖
發送消息方法(Send.java)
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 5 public class Send { 6 7 private static final String QUEUE_NAME = "hello"; 8 9 public static void main(String[] args) throws Exception { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("192.168.1.7"); 12 factory.setPort(5672); 13 factory.setUsername("admin"); 14 factory.setPassword("admin"); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 19 String message = "Hello World!"; 20 // "" 表示預設exchange 21 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 22 System.out.println(" [x] Sent '" + message + "'"); 23 24 channel.close(); 25 connection.close(); 26 } 27 28 }
10~16行 是獲取rabbitmq.client.Channel, rabbitMQ的API操作基本都是通過channel來完成的。
18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),這裡channel聲明瞭一個名字叫“hello”的queue,聲明queue的操作是冪等的,也就是說只有不存在相同名稱的queue的情況下才會創建一個新的queue。
21行 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()),chaneel在這個queue里發佈了消息(位元組數組)。
24~25行 則是鏈接的關閉,註意關閉順序就好了。
接受消息方法 (Recv.java)
1 import com.rabbitmq.client.AMQP; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 9 import java.io.IOException; 10 11 public class Recv { 12 13 private final static String QUEUE_NAME = "hello"; 14 15 public static void main(String[] argv) throws Exception { 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.setHost("192.168.1.7"); 18 factory.setPort(5672); 19 factory.setUsername("admin"); 20 factory.setPassword("admin"); 21 Connection connection = factory.newConnection(); 22 Channel channel = connection.createChannel(); 23 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 26 27 Consumer consumer = new DefaultConsumer(channel) { 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 30 throws IOException { 31 String message = new String(body, "UTF-8"); 32 System.out.println(" [x] Received '" + message + "'"); 33 } 34 }; 35 channel.basicConsume(QUEUE_NAME, true, consumer); 36 } 37 }
16~22行 和Send類中一樣,也是獲取同一個rabbitMQ服務的channel,這也是能接受到消息的基礎。
24行 同樣聲明瞭一個和Send類中發佈的queue相同的queue。
27~35行 DefaultConsumer
類實現了Consumer
介面,由於推送消息是非同步的,因此在這裡提供了一個callback來緩衝接受到的消息。
先運行Recv 然後再運行Send,就可以看到消息被接受輸出到控制台了,如果多啟動幾個Recv,會發現消息被每個消費者按順序分別消費了,
這也就是rabbitMQ預設採用Round-robin dispatching(輪詢分發機制)。
Work queues
上面簡單的實現了rabbitMQ消息的發送和接受,但是無論Send類中的queueDeclare 、basicPublish方法還有Recv類中的basicConsume方法都有很多的參數,
下麵我們分析一下幾個重要的參數。
(一)Message acknowledgment 消息答覆
上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),
在Channel介面中定義為 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
這個autoAck我們當前實現為true,表示伺服器會自動確認ack,一旦RabbitMQ將一個消息傳遞到consumer,它馬上會被標記為刪除狀態。
這樣如果consumer在正常執行任務過程中,一旦consumer服務掛了,那麼我們就永遠的失去了這個consumer正在處理的所有消息。
為了防止這種情況,rabbitMQ支持Message acknowledgment,當消息被一個consumer接受並處理完成後,consumer發送給rabbitMQ一個回執,然後rabbitMQ才會刪除這個消息。
當一個消息掛了,rabbitMQ會給另外可用的consumer繼續發送上個consumer因為掛了而沒有處理成功的消息。
因此我們可以設置autoAck=false,來顯示的讓服務端做消息成功執行的確認。
(二)Message durability 消息持久化
Message acknowledgment 確保了consumer掛了的情況下,消息還可以被其他consumer接受處理,但是如果rabbitMQ掛了呢?
在聲明隊列的方法中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
durable=true 意味著該隊列將在伺服器重啟後繼續存在。Send和Recv兩個類中聲明隊列的方法都要設置durable=true。
現在,我們需要將消息標記為持久性——通過將MessageProperties(它實現BasicProperties)設置為PERSISTENT_TEXT_PLAIN
(三)Fair dispatch 公平分發
rabbitMQ預設是輪詢分發,這樣對多個consumer而言,可能就會出現負載不均衡的問題,無論是任務本身難易度,還是consumer處理能力的不同,都是導致這種問題。
為了處理這種情況我們可以使用basicQos
方法來設置prefetchCount = 1
。 這告訴rabbitMQ一次只給consumer一條消息,換句話來說,就是直到consumer發回ack,然後再向這個consumer發送下一條消息。
int
prefetchCount =
1
;
channel.basicQos(prefetchCount);
正是因為Fair dispatch是基於ack的,所有它最好和Message acknowledgment同時使用,否則在autoAck=true的情況下,單獨設置Fair dispatch並沒有效果。
下麵是本人測試以上三種情況的測試代碼,可以直接使用。
import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true; //消息持久化 channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 多個消息使用空格分隔 Scanner sc = new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // basicQos方法來設置prefetchCount = 1。 這告訴RabbitMQy一次只給worker一條消息,換句話來說,就是直到worker發回ack,然後再向這個worker發送下一條消息。 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 當consumer確認收到某個消息,並且已經處理完成,RabbitMQ可以刪除它時,consumer會向RabbitMQ發送一個ack(nowledgement)。 boolean autoAck = true; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } protected static void doWork(String message) throws InterruptedException { for (char ch: message.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
發佈/訂閱(Publish/Subscribe)
一個完整的rabbitMQ消息模型是會有Exchange的。
rabbitMQ的消息模型的核心思想是producer永遠不會直接發送任何消息到queue中,實際上,在很多情況下producer根本不知道一條消息是否被髮送到了哪個queue中。
在rabbitMQ中,producer僅僅將消息發送到一個exchange中。要理解exchange也非常簡單,它一邊負責接收producer發送的消息, 另一邊將消息推送到queue中。
exchange必須清楚的知道在收到消息之後該如何進行下一步的處理,比如是否應該將這條消息發送到某個queue中? 還是應該發送到多個queue中?還是應該直接丟棄這條消息等。
exchange模型如下:
exchange類型也有好幾種:direct
,topic
,headers
以及fanout。
Fanout exchange
下麵我們來創建一個fanout
類型的exchange,顧名思義,fanout會向所有的queue廣播所有收到的消息。
1 import java.io.IOException; 2 import java.util.Scanner; 3 import java.util.concurrent.TimeoutException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 import rabbitMQ.RabbitMQTestUtil; 10 11 public class EmitLog { 12 13 private static final String EXCHANGE_NAME = "logs"; 14 15 public static void main(String[] argv) throws IOException, TimeoutException { 16 17 ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 22 23 // 多個消息使用空格分隔 24 Scanner sc = new Scanner(System.in); 25 String[] splits = sc.nextLine().split(" "); 26 for (int i = 0; i < splits.length; i++) { 27 channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes()); 28 System.out.println(" [x] Sent '" + splits[i] + "'"); 29 } 30 31 channel.close(); 32 connection.close(); 33 } 34 }
1 import java.io.IOException; 2 3 import com.rabbitmq.client.AMQP; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 import com.rabbitmq.client.Consumer; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope; 10 11 import rabbitMQ.RabbitMQTestUtil; 12 13 public class ReceiveLogs { 14 15 private static final String EXCHANGE_NAME = "logs"; 16 17 public static void main(String[] argv) throws Exception { 18 ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); 19 Connection connection = factory.newConnection(); 20 Channel channel = connection.createChannel(); 21 22 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 23 String queueName = channel.queueDeclare().getQueue(); 24 channel.queueBind(queueName, EXCHANGE_NAME, ""); 25 26 Consumer consumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, 29 AMQP.BasicProperties properties, byte[] body) throws IOException { 30 String message = new String(body, "UTF-8"); 31 System.out.println(" [x] Received '" + message + "'"); 32 } 33 }; 34 channel.basicConsume(queueName, true, consumer); 35 } 36 }
Direct exchange
在fanout的exchange類型中,消息的發佈已經隊列的綁定方法中,routingKey參數都是預設空值,因為fanout類型會直接忽略這個值,
但是在其他exchange類型中它擁有很重要的意義,
rabbitMQ支持以上兩種綁定,消息在發佈的時候,會指定一個routing key,而圖一中exchange會把routing key為orange
發送的消息將會被路由到queue Q1
中,使用routing key為black
或者green
的將會被路由到Q2
中。
將多個queue使用相同的binding key進行綁定也是可行的。可以在X和Q1中間增加一個routing key black
。 它會向所有匹配的queue進行廣播,使用routing key為black
發送的消息將會同時被Q1
和Q2
接收。
下麵是我測試debug和error兩種routing key發佈消息並接受處理消息的代碼:
import java.util.Scanner; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitMQ.RabbitMQTestUtil; public class EmitLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, TimeoutException { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 多個消息使用空格分隔 Scanner sc = new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }View Code
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import rabbitMQ.RabbitMQTestUtil; public class ReceiveLogsDebug { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "debug"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }View Code
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import rabbitMQ.RabbitMQTestUtil; public class ReceiveLogsError { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }View Code
發送輸入:
debug接受:
error接受:
Topic exchange
發送到topic exchange中的消息不能有一個任意的routing_key
——它必須是一個使用點分隔的單詞列表。單詞可以是任意的。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。
routing key的長度限製為255個位元組數。
binding key也必須是相同的形式。topic exchange背後的邏輯類似於direct——一條使用特定的routing key發送的消息將會被傳遞至所有使用與該routing key相同的binding key進行綁定的隊列中。 然而,對binding key來說有兩種特殊的情況:
- *(star)可以代替任意一個單詞
- #(hash)可以代替0個或多個單詞
和Direct exchange差不多,代碼就不copy了,有興趣的直接看看教程http://www.rabbitmq.com/tutorials/tutorial-five-java.html