在上一章的指南中,我們寫了一個命名隊列:生產者往該命名隊列發送消息、消費從從該命名隊列中消費消息。在本章中,我們將創建一個工作隊列,用於在多個工作者之間分配耗時的任務。工作隊列(即任務隊列)的主要思想是避免立即執行那些需要等他們執行完成的資源密集型任務。相反,我們將任務安排在稍後完成。我們將任務封裝 ...
在上一章的指南中,我們寫了一個命名隊列:生產者往該命名隊列發送消息、消費從從該命名隊列中消費消息。在本章中,我們將創建一個工作隊列,用於在多個工作者之間分配耗時的任務。工作隊列(即任務隊列)的主要思想是避免立即執行那些需要等他們執行完成的資源密集型任務。相反,我們將任務安排在稍後完成。我們將任務封裝為消息並將其發送到隊列,後臺運行的工作進程將取出任務並執行完成。如果你啟動了多個工作者,這些任務將在多個工作者之間分享。
這個概念也即我們說的非同步,在項目中,有時候一個簡單的Web請求,後臺要做一系統的操作,這時候,如果後臺執行完成之後再給前臺返回消息將會導致瀏覽器頁面等待從而出現假死狀態。因此,通常的做法是,在這個Http請求到後臺,後臺獲取到正確的參數等信息後立即給前臺返回一個成功標誌,然後後臺非同步地進行後續的操作。
1、準備
本章中,我們將發送字元串消息來模擬複雜的任務。這裡因為沒有一個真實的複雜任務,因此用Thread.sleep()方法來模擬複雜耗時的任務。我們用字元串中的含點(“.")的數量來表示任務的複雜程度,一個點表示一秒鐘的耗時,例如:一個發送”Hello ...“字元串的任務將會耗時3秒鐘。
我們可以直接將上一章中的Send.java代碼拿過來修改,允許從命令行發送消息。本程式將會把任務調試到工作隊列,因此,我們將類名改為NewTask.java:
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
此時完整的NewTask.java代碼為:
1 public class NewTask { 2 3 private final static String QUEUE_NAME = "hello"; 4 5 public static void main(String[] argv) throws IOException, TimeoutException { 6 7 ConnectionFactory connectionFactory = new ConnectionFactory(); 8 connectionFactory.setHost("HOST"); 9 10 try(Connection connection = connectionFactory.newConnection(); 11 Channel channel = connection.createChannel()) { 12 13 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 14 15 String message = String.join(" ", argv); 16 17 channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); 18 System.out.println(" [x] Sent '" + message + "'"); 19 } 20 } 21 }
之前的Recv.java也要做一些修改:模擬字元串消息中的每個點耗時1秒鐘,它將處理傳送過來的消息並執行任務,因此,我們修改為Work.java:
1 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 2 String message = new String(delivery.getBody(), "UTF-8"); 3 4 System.out.println(" [x] Received '" + message + "'"); 5 try { 6 doWork(message); 7 } finally { 8 System.out.println(" [x] Done"); 9 } 10 }; 11 boolean autoAck = true; // acknowledgment is covered below 12 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
我們模擬執行過程中耗時的偽任務:
1 private static void doWork(String task) throws InterruptedException { 2 for (char ch: task.toCharArray()) { 3 if (ch == '.') Thread.sleep(1000); 4 } 5 }
此時完整的Work.java為:
1 public class Worker { 2 private final static String TASK_QUEUE_NAME = "hello"; 3 4 public static void main(String[] args) throws Exception { 5 6 ConnectionFactory connectionFactory = new ConnectionFactory(); 7 connectionFactory.setHost("HOST"); 8 9 Connection connection = connectionFactory.newConnection(); 10 Channel channel = connection.createChannel(); 11 channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); 12 13 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 14 String message = new String(delivery.getBody(), "UTF-8"); 15 16 System.out.println(" [x] Received '" + message + "'"); 17 try { 18 doWork(message); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } finally { 22 System.out.println(" [x] Done"); 23 } 24 }; 25 26 boolean autoAck = true; // acknowledgment is covered below 27 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); 28 } 29 30 private static void doWork(String task) throws InterruptedException { 31 for (char ch: task.toCharArray()) { 32 if (ch == '.') Thread.sleep(1000); 33 } 34 } 35 }
2、迴圈調度
使用工作隊列的優點之一是能夠輕鬆地進行並行化操作。假設我們在做一個後臺日誌收集系統,我們可以很容易地增加更多的Worker從而提高系統性能。
首先,我們同時啟動兩個Worker,同樣地,我這裡也放到IDEA中啟動:
接下來,我們先後啟動5個Task,並分別通過main()參數傳入五個字元串消息:
1 First message. 2 Second message.. 3 Third message... 4 Fourth message.... 5 Fifth message.....
執行五個發送任務之後,來看一下兩個Worker都接收到了什麼樣的消息:
預設情況下,RabbitMQ將按順序將每個消息發送給下一個使用者。平均每個消費者將得到相同數量的消息。這種消息的調度方式稱之為迴圈調度,你可以開啟更多的Worker來進行測試。
3、消息回執
因為消費者執行一個任務會有時間耗時,假設一個消費者在執行一個任務執行一半的時候掛掉了將會怎樣?消息會不會因此丟失?在我們目前的代碼里,一旦RabbitMq將一條消息轉發給了一個消費者後,將會立即將消息刪除(註意Worker.java里的autoAck),因此,在我們上面例子里,如kill掉一個正在處理數據的Worker,那麼該數據將會丟失。不僅如此,所有那些指派給該Worker的還未處理的消息也會丟失。
但在實際工作的,我們並不希望一個Worker掛掉之後就會丟失數據,我們希望的是:如果該Worker掛掉了,所有轉發給該Worker的消息將會重新轉發給其他Worker進行處理(包括處理了一半的消息)。為了確保一條消息永不丟失,RabbitMq支持消息回執。消費者在接收到一條消息,並且成功處理完成之後會給RabbitMq回發一條確認ack確認消息,RabbitMq此時才會刪除該條消息。
如果一個Worker正在處理一條消息時掛掉了(通道關閉、連接關閉、TCP連接丟失),它將沒有機會發送ack回執,RabbitMq就認為該消息沒有消費成功,於是便會將該消息重新放到隊列中,如果此時有其他消費者還是線上狀態,RabbitMq會立即將該條消息再轉發給其他線上的消費者。這種機制可以保證任何消息都不會丟失。
預設情況下,需要手動進行消息確認,在前面的例子里,我們通過autoAck=true顯示地關閉了手動消息確認,因此,RabbitMq將採用自動消息確認的機制。現在,我們修改我們的程式,採用手動發送回執的方式,當我們完成對消息的處理後,再手動發送回執確認:
1 channel.basicQos(1); // accept only one unack-ed message at a time (see below) 2 3 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 4 String message = new String(delivery.getBody(), "UTF-8"); 5 6 System.out.println(" [x] Received '" + message + "'"); 7 try { 8 doWork(message); 9 } finally { 10 System.out.println(" [x] Done"); 11 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 12 } 13 }; 14 boolean autoAck = false; 15 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
ack發送通道必須和接收消息的通道(channel)是同一個,如果嘗試通過一個不同的通道發送ack回執,將會拋出channel等級協議異常(官網說會拋出異常,但是我在實際測試中並沒有拋異常,只是該條消息得不到回執,從而也無法刪除)。
一個常見的錯誤是忘了手動回執,雖然只是一個簡單的錯誤,但是帶來的後果卻是嚴重的,它將導致已經消費掉的消費不會被刪除,並且當消費該消息的消費者在退出之後,RabbitMq會將該條消息重新進行轉發,記憶體將被慢慢耗盡。我們可以通過正面的命令來檢查這種錯誤:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
該命令有三列內容,第一列是在監聽的隊列名稱,第二列是Ready狀態的消息數量,第三列是Unacked的消息數量。
4、消息的持久化
在3中我們講解瞭如何保證當消費者掛掉之後消息不被丟失,但是,如果RabbitMq服務或者部署RabbitMq的伺服器掛掉了之後,消息仍然會丟失。當RabbitMq崩潰之後,它將會忘記所有的隊列和消息,除非,有什麼機制讓RabbitMq將隊列信息和消息保存下來。
要確保消息和隊列不會丟失,我們必須要確保兩件事情。
首先,我們要確保RabbitMq永遠不丟失隊列,要做到這點,我們在定義的時候就需要告訴RabbitMq它是需要持久化的,通過指定durable參數實現:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然這個命令本身是正確的,但是在我們目前它不能工作。因為我們前面已經定義了一個非持久化的hello隊列,RabbitMq不允許重新定義一個已經存在的隊列(用不同的參數),否則會拋出異常:
Exception in thread "main" java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:23) Suppressed: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68) at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:29)
要麼重啟RabbitMq讓該臨時隊列消失,要麼在控制台將該隊列刪除,或者重新創建一個新的隊列:
1 boolean durable = true; 2 channel.queueDeclare("task_queue", durable, false, false, null);
生產者和消費者要做同步修改。
上面這一步,我們保證了隊列(task_quee)的持久化,此時,即便RabbitMq崩潰了也不會丟失該隊列,當RabbitMq重啟後將自動重新載入該隊列。
其次,我們需要確保我們的消息也被持久化,要做到這一點,在生產者發佈消息的時候需要指定消息的屬性為:PERSISTENT_TEXT_PLAIN。
1 import com.rabbitmq.client.MessageProperties; 2 3 channel.basicPublish("", "task_queue", 4 MessageProperties.PERSISTENT_TEXT_PLAIN, 5 message.getBytes());
註意,即便設置了消息的持久化屬性也不能保證消息會被100%地寫入到磁碟中,因為RabbitMq在接收到消息和寫入到磁碟不是同步的,有可能消息只是被寫入到緩存中而還沒來和及寫入磁碟的時候,RabbitMq崩潰了,此時也會丟失消息。但無論如何,比前面簡單的消息隊列已經強大了很多。
5、公平調度
您可能已經註意到,任務調度仍然不能完全按照我們希望的方式工作。舉個例子,在只有兩個Worker的環境中,奇數的消息比較重,偶數的消息比較輕時,一個Worker將會一直處於忙碌狀態,而另一個Worker將會一直處於空閑狀態,但RabbitMq並不知道這種情況,它會依然均衡地向兩個Worker傳遞消息。
發生這種情況是因為,當一個消息進入隊列之後,RabbitMq只是盲目地將該第n個消息轉發給第n個消費者,它並不關註每個消費者發了多少個回執。
為瞭解決這個問題,我們可以通過調用basicQos方法,給它傳入1。這將告訴RabbitMq不要同時給一個隊列轉發多於1條的消息,換句話說,在一個消費者沒有完成並回執前一條消息時,不要再給它轉發其他消息。
1 int prefetchCount = 1; 2 channel.basicQos(prefetchCount);
6、完整的代碼
一、NewTask.java
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 import com.rabbitmq.client.MessageProperties; 5 6 public class NewTask { 7 8 private static final String TASK_QUEUE_NAME = "task_queue"; 9 10 public static void main(String[] argv) throws Exception { 11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost("localhost"); 13 try (Connection connection = factory.newConnection(); 14 Channel channel = connection.createChannel()) { 15 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 16 17 String message = String.join(" ", argv); 18 19 channel.basicPublish("", TASK_QUEUE_NAME, 20 MessageProperties.PERSISTENT_TEXT_PLAIN, 21 message.getBytes("UTF-8")); 22 System.out.println(" [x] Sent '" + message + "'"); 23 } 24 } 25 26 }
二、Worker.java
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 import com.rabbitmq.client.DeliverCallback; 5 6 public class Worker { 7 8 private static final String TASK_QUEUE_NAME = "task_queue"; 9 10 public static void main(String[] argv) throws Exception { 11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost("localhost"); 13 final Connection connection = factory.newConnection(); 14 final Channel channel = connection.createChannel(); 15 16 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 17 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 18 19 channel.basicQos(1); 20 21 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 22 String message = new String(delivery.getBody(), "UTF-8"); 23 24 System.out.println(" [x] Received '" + message + "'"); 25 try { 26 doWork(message); 27 } finally { 28 System.out.println(" [x] Done"); 29 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 30 } 31 }; 32 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); 33 } 34 35 private static void doWork(String task) { 36 for (char ch : task.toCharArray()) { 37 if (ch == '.') { 38 try { 39 Thread.sleep(1000); 40 } catch (InterruptedException _ignored) { 41 Thread.currentThread().interrupt(); 42 } 43 } 44 } 45 } 46 }
至此,工作隊列模式講解完了,下一章,將講解發佈-訂閱模式。