人一輩子最值得炫耀的不應該是你的財富有多少(雖然這話說得有點違心,呵呵),而是你的學習能力。技術更新迭代的速度非常快,那作為程式員,我們就應該擁有一顆擁抱變化的心,積極地跟進。 在 RabbitMQ 入門之前,我已經入門了 Redis、Elasticsearch 和 MongoDB,這讓我感覺自己富 ...
人一輩子最值得炫耀的不應該是你的財富有多少(雖然這話說得有點違心,呵呵),而是你的學習能力。技術更新迭代的速度非常快,那作為程式員,我們就應該擁有一顆擁抱變化的心,積極地跟進。
在 RabbitMQ 入門之前,我已經入門了 Redis、Elasticsearch 和 MongoDB,這讓我感覺自己富有極客精神,非常良好。
小伙伴們在繼續閱讀之前,我必須要聲明一點,我對 RabbitMQ 並沒有進行很深入的研究,僅僅是因為要用,就學一下。但作為一名負責任的技術博主,我是動了心的,這篇入門教程,小伙伴們讀完後絕對會感到滿意,忍不住無情地點贊,以及赤裸裸地轉發。
當然了,小伙伴們遇到文章中有錯誤的地方,不要手下留情,可以組團過來捶我,但要保證一點,不要打臉,我怕毀容。
01、RabbitMQ 是什麼
首先,我知道,Rabbit 是一隻兔子(哎呀媽呀,忍不住秀了一波自己的英語功底),可愛的形象已經躍然於我的腦海中了。那 MQ 又是什麼呢?是 Message Queue 的首字母縮寫,也就是說 RabbitMQ 是一款開源的消息隊列系統。
RabbitMQ 的主要特點在於健壯性好、易於使用、高性能、高併發、集群易擴展,以及強大的開源社區支持。反正就是很牛逼的樣子。
九年前我做大宗期貨交易的時候,也需要消息推送,那時候還不知道去找這種現成的中間件,就用自定義的隊列實現,結果搞了不少 bug,有些到現在還沒有解決,真的是不堪迴首的往事啊。
下圖是 RabbitMQ 的消息模型圖(來源於網路,侵刪),小伙伴們來感受下。
1)P 是 Producer,代表生產者,也就是消息的發送者,可以將消息發送到 X
2)X 是 Exchange(為啥不是 E,我也很好奇),代表交換機,可以接受生產者發送的消息,並根據路由將消息發送給指定的隊列
3)Q 是 Queue,也就是隊列,存放交換機發送來的消息
4)C 是 Consumer,代表消費者,也就是消息的接受者,從隊列中獲取消息
聽我這樣一解釋,是不是對 RabbitMQ 的印象就很具象化了?小伙伴們,學起來吧!
02、安裝 Erlang
咦,怎麼不是安裝 RabbitMQ 啊?先來看看官方的解釋。
英文看不太懂,沒關係,我來補充兩人話。RabbitMQ 伺服器是用 Erlang 語言編寫的,它的安裝包里並沒有集成 Erlang 的環境,因此需要先安裝 Erlang。小伙伴們不要擔心,Erlang 安裝起來沒有任何難度。
Erlang 下載地址如下:
https://erlang.org/download/otp_versions_tree.html
最新的版本是 23.0.1,我選擇的是 64 位的版本,104M 左右。下載完就可以雙擊運行安裝,傻瓜式的。
需要註意的是,我安裝的過程中,電腦重啟了一次,好像要安裝一個什麼庫,重啟之前忘記保存圖片了(sorry)。重啟後,重新雙擊運行 otp_win64_23.0.1.exe 文件完成 Erlang 安裝。
03、安裝 RabbitMQ
Erlang 安裝成功後,就可以安裝 RabbitMQ 了。下載地址如下所示:
https://www.rabbitmq.com/install-windows.html
找到下圖中的位置,選擇紅色框中的文件進行下載。
安裝包只有 16.5M 大小,還是非常輕量級的。下載完後直接雙擊運行 exe 文件就可以傻瓜式地安裝了。
安裝成功後,就可以將 RabbitMQ 作為 Windows 服務啟動,可以從“開始”菜單管理 RabbitMQ Windows 服務。
點擊「RabbitMQ Command Prompt (sbin dir)」,進入命令行,輸入 rabbitmqctl.bat status
可確認 RabbitMQ 的啟動狀態。
可以看到 RabbitMQ 一些狀態信息:
- 進程 ID,也就是 PID 為 2816
- 操作系統為 Windows
- 當前的版本號為 3.8.4
- Erlang 的配置信息
命令行界面看起來不夠優雅,因此我們可以輸入以下命令來啟用客戶端管理 UI 插件:
rabbitmq-plugins enable rabbitmq_management
看到以下信息就可以確認插件啟用成功了。
在瀏覽器地址欄輸入 http://localhost:15672/ 可以進入管理端界面,如下圖所示:
04、在 Java 中使用 RabbitMQ
有些小伙伴可能會問,“二哥,我是一名 Java 程式員,我該如何在 Java 中使用 RabbitMQ 呢?”這個問題問得好,這就來,這就來。
第一步,在項目中添加 RabbitMQ 客戶端依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
第二步,我們來模擬一個最簡單的場景,一個生產者發送消息到隊列中,一個消費者從隊列中讀取消息並列印。
官方對 RabbitMQ 有一個很好的解釋,我就“拿來主義”的用一下。在我上高中的年代,同學們之間最流行的交流方式不是 QQ、微信,甚至簡訊這些,而是書信。因為那時候還沒有智能手機,況且上學期間學校也是命令禁用手機的,所以書信是情感表達的最好方式。好懷念啊。
假如我向女朋友小巷寫了一封情書,內容如下所示:
致小巷
你好呀,小巷。
你走了以後我每天都感到很悶,就像堂吉訶德一樣,每天想念托波索的達辛妮亞。我現在已經養成了一種習慣,就是每兩三天就要找你說幾句不想對別人說的話。
。。。。。。
王二,5月20日
那這封情書要寄給小巷,我就需要跑到郵局,買上郵票,投遞到郵箱當中。女朋友要收到這封情書,就需要郵遞員盡心儘力,不要弄丟了。
RabbitMQ 就像郵局一樣,只不過處理的不是郵件,而是消息。之前解釋過了,P 就是生產者,C 就是消費者。
新建生產者類 Wanger :
public class Wanger {
private final static String QUEUE_NAME = "love";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "小巷,我喜歡你。";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [王二] 發送 '" + message + "'");
}
}
}
1)QUEUE_NAME 為隊列名,也就是說,生產者發送的消息會放到 love 隊列中。
2)通過以下方式創建伺服器連接:
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
ConnectionFactory 是一個非常方便的工廠類,可用來創建到 RabbitMQ 的預設連接(主機名為“localhost”)。然後,創建一個通道( Channel)來發送消息。
Connection 和 Channel 類都實現了 Closeable 介面,所以可以使用 try-with-resource 語句,如果有小伙伴對 try-with-resource 語句不太熟悉,可以查看我之前寫的我去文章。
3)在發送消息的時候,必須設置隊列名稱,通過 queueDeclare()
方法設置。
4)basicPublish()
方法用於發佈消息:
- 第一個參數為交換機(exchange),當前場景不需要,因此設置為空字元串;
- 第二個參數為路由關鍵字(routingKey),暫時使用隊列名填充;
- 第三個參數為消息的其他參數(BasicProperties),暫時不配置;
- 第四個參數為消息的主體,這裡為 UTF-8 格式的位元組數組,可以有效地杜絕中文亂碼。
生產者類有了,接下來新建消費者類 XiaoXiang:
public class XiaoXiang {
private final static String QUEUE_NAME = "love";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [小巷] 接收到的消息 '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
1)創建通道的代碼和生產者差不多,只不過沒有使用 try-with-resource 語句來自動關閉連接和通道,因為我們希望消費者能夠一直保持連接,直到我們強制關閉它。
2)在接收消息的時候,必須設置隊列名稱,通過 queueDeclare() 方法設置。
3)由於 RabbitMQ 將會通過非同步的方式向我們推送消息,因此我們需要提供了一個回調,該回調將對消息進行緩衝,直到我們做好準備接收它們為止。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [小巷] 接收到的消息 '" + message + "'");
};
basicConsume()
方法用於接收消息:
第一個參數為隊列名(queue),和生產者相匹配(love)。
第二個參數為 autoAck,如果為 true 的話,表明伺服器要一次性交付消息。怎麼理解這個概念呢?小伙伴們可以在運行消費者類 XiaoXiang 類之前,先多次運行生產者類 Wanger,向隊列中發送多個消息,等到消費者類啟動後,你就會看到多條消息一次性接收到了,就像下麵這樣。
等待接收消息
[小巷] 接收到的消息 '小巷,我喜歡你。'
[小巷] 接收到的消息 '小巷,我喜歡你。'
[小巷] 接收到的消息 '小巷,我喜歡你。'
第三個參數為 DeliverCallback,也就是消息的回調函數。
第四個參數為 CancelCallback,我暫時沒搞清楚是幹嘛的。
在消息發送的過程中,也可以使用 RabbitMQ 的管理面板查看到消息的走勢圖,如下所示。
05、鳴謝
好了,我親愛的小伙伴們,以上就是本文的全部內容了,是不是看完後很想實操一把 RabbitMQ,趕快行動吧!如果你在學習的過程中遇到了問題,歡迎隨時和我交流,雖然我也是個菜鳥,但我有熱情啊。
另外,如果你想寫入門級別的文章,這篇就是最好的範例。