在說正題之前先解釋一下交換機模式是個籠統的稱呼,它不是一個單獨的模式(包括了訂閱模式,路由模式和主題模式),交換機模式是一個比較常用的模式,主要是為了實現數據的同步。 首先,說一下訂閱模式,就和字面上的意思差不多主要就是一個生產者,多個消費者,同一個消息被多個消費者獲取,先看一下官網的圖示 整體執行 ...
在說正題之前先解釋一下交換機模式是個籠統的稱呼,它不是一個單獨的模式(包括了訂閱模式,路由模式和主題模式),交換機模式是一個比較常用的模式,主要是為了實現數據的同步。
首先,說一下訂閱模式,就和字面上的意思差不多主要就是一個生產者,多個消費者,同一個消息被多個消費者獲取,先看一下官網的圖示
整體執行過程就和圖裡一樣,生產者把消息發送到交換機,然後隊列綁定到交換機,消息由交換機發送到隊列,每一個隊列都有一個各自的消費者。這樣
就實現了一個消息被多個消費者所獲取,而且如果有新的消費者加入直接綁定隊列到交換機就可以了,大大的降低了系統間的耦合度。還有一點要註意的就是
當我們把消息發送到一個沒有隊列綁定的交換機時,消息就會丟失,因為消息只能存儲在隊列,而交換機只做交換,不做存儲!
生產者代碼:
public class Send { private final static String EXCHANGE_NAME = "exchange_name"; //交換機名稱 public static void main(String[] argv) throws Exception { // 獲取MQ連接和通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "生產者消息"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" 發送 '" + message + "'"); channel.close(); connection.close(); } }
消費者一號代碼:
public class Recv { private final static String QUEUE_NAME = "test_queue_ex"; private final static String EXCHANGE_NAME = "exchange_name"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者一號 '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者二號代碼:
public class Recv2 { private final static String QUEUE_NAME = "test_queue_ex2"; private final static String EXCHANGE_NAME = "exchange_name"; public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("消費者二號 '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
運行代碼之後可以看到
生產者發送的消息已經存儲在了交換機之中。查看綁定關係如下圖所示:
所以,可以得出結論一個消息被多個消費者所消費。訂閱模式也存在著缺陷有時並不是所有數據都需要同步,所以用訂閱模式來做數據同步並不合理。於是就用到了路由模式。
路由模式
官網圖示如下:
和訂閱模式比較類似,只是type變成了direct類型,路由模式也是先由生產者發送消息到交換機,然後在根據綁定鍵來判斷消息發送到哪一個交換機。如下圖:
和訂閱模式的區別就是生產者發送消息時要先聲明消息的類型,也就是說消息會被哪類消費者所獲取
消費者和生產者保持一個類型的時候,就可以接收到對應生產者所發送的消息了。從而可以過濾掉不需要的消息類型。
主題模式
主題模式個人感覺就和sql語句里的like關鍵字一樣,不用保證消息類型一樣,只要保證其相似就可以接收消息了,相比於路由模式,
主題模式匹配率比較低,但是功能確提高了很多,減少了路由key的創建,如圖所示:
type變成了topic類型,至於其他方面和路由模式一樣就不多說了。