參考資料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一個中間人,它接受和轉發消息。我們可以把它想象成一個郵局:當你把郵件投入郵箱的時候,你可以確信它最終會被投遞到收件人的手中。RabbitMQ就是那個郵箱、郵局和郵差。區別就在 ...
參考資料:RabbitMQ tutorial - "Hello world!" — RabbitMQ
前言
RabbitMQ
是一個中間人,它接受和轉發消息。我們可以把它想象成一個郵局:當你把郵件投入郵箱的時候,你可以確信它最終會被投遞到收件人的手中。RabbitMQ
就是那個郵箱、郵局和郵差。區別就在於RabbitMQ
投遞的是二進位的消息數據。
這裡有一些術語需要說明。
發送、產生消息的程式我們稱之為生產者producer
,使用此圖標表示
隊列queue
就是一個有名字的郵箱,雖然消息會在RabbitMQ和你的應用程式之間流動,但是它們可以持久保存在隊列中。隊列會收到主機的記憶體和磁碟容量的限制。多個生產者可以向一個隊列發送消息,同時多個消費者consumer
也可以從一個隊列中消費/接收消息。
隊列的圖標如下
接收、消費消息的程式我們稱之為消費者consumer
,使用此圖標表示
註意,生產者、消費者以及RabbitMQ
不需要位於同一臺伺服器上。事實上,絕大多數情況中,它們都分別位於不同的伺服器上。生產者和消費者可以是同一個程式實現。
這裡我們的實驗環境,生產者和消費者是獨立的程式文件,但是它們以及RabbitMQ
都位於同一臺伺服器上。
Hello World!
學習
RabbitMQ
,其中的生產者和消費者需要用戶自己使用編程語言來實現。我本人是一名運維工程師,有過bash, awk的經驗,2017年學習廖雪峰的Python學了一半,近期又學習了B站高淇老師的Java前三章(包含面向對象),對於Python和Java語言算是一知半解,大概看得懂又寫不出來的水平。我自己嘗試了下,即使沒有編程基礎,只要我們嚴格按照官方的教程指導,也是可以將這些代碼實現的。
這部分我們使用python手寫生產者和消費者,使用RabbitMQ
自帶的guest
賬戶。生產者會發送一條簡單的消息給到名為hello
的隊列,消費者從隊列會收到這條消息並將其列印出來。
簡單的流程圖如圖所示,隊列就有點類似於代表了消費者去接收了這條消息。
我們的操作系統自帶了python 3.9,無論是鍵入python又或者是python3,都是指向python3,這點可以從字元鏈接裡面看出來。
[root@rabbitmq-01 ~]# ls -l $(which python)
lrwxrwxrwx. 1 root root 9 Dec 12 20:51 /usr/bin/python -> ./python3
[root@rabbitmq-01 ~]# ls -l $(which python3)
lrwxrwxrwx. 1 root root 9 Dec 12 20:42 /usr/bin/python3 -> python3.9
我們需要使用到 Pika
庫來使得我們的python代碼可以連接RabbitMQ,這個庫也是官方推薦的。
python -m pip install pika --upgrade
如果沒有安裝pip
的話,可以使用
python -m ensurepip --upgrade
接下來就可以開始正式寫python
代碼了。
Sending
我們的第一個程式send.py
會發送一條消息到隊列hello
中。首先我們需要建立連接。
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
連接建立完畢,在發送消息之前,我們需要確保隊列hello
必須存在,否則我們發送出去的消息就會被丟棄。因此我們創建隊列。
channel.queue_declare(queue='hello')
此時我們可以開始發送消息了,我們計劃發送消息Hello World!
到隊列hello
中去。
在mq中,消息無法直接發送到隊列中去,必須要經過exchange
。目前我們還不需要對其進行展開理解,大家直到這麼個概念即可,後續在講解RabbitMQ tutorial - Publish/Subscribe — RabbitMQ
的時候會涉及到。
我們現在只需要將其發送給預設的exchange
,使用空字元串來識別它。它是一個特殊的exchange
,允許我們指定要發送消息的隊列,隊列使用routing_key
參數來指定。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
這裡的print只是用來告訴用戶我們做了什麼。
消息發送完畢了,在我們退出程式前,我們應該確保網路buffer
已經被清空,我們的消息真的發出去了。這裡通過優雅地關閉連接來實現。
connection.close()
發送到這裡就結束了,如果消息沒有發送成功的話,可以考慮從日誌入手排查問題。
Receiving
第二個程式是消費者receive.py
,它將會從隊列中接收消息並將其列印出來。
首先我們需要連接到伺服器上,這部分代碼和生產者代碼相同。
接下來,同樣我們需要確保隊列的存在性。使用queue_declare
創建隊列是冪等(idempotent)的,即使執行多次也只會創建一個隊列。
channel.queue_declare(queue='hello')
之所以在這裡重覆創建隊列,是為了
- 我們可能不知道生產者和消費者程式,哪個會先運行,因此最好在兩端都創建隊列。
- 創建隊列的操作需要具備冪等性。
從隊列中接收一個消息會更加複雜,它工作方式是訂閱一個callback
函數到隊列上。每當隊列中有消息出現的時候,callback
函數都會被Pika
庫調用。在本案例中,callback
函數會列印消息在屏幕上。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
接下來我們告訴mq,這個特定的callback
函數需要從隊列hello
接收消息。這一步需要確保隊列已存在,好在我們上面的代碼已經確保過了。
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
auto_ack
參數將會在RabbitMQ tutorial - Work Queues — RabbitMQ
中講解。
接下來我們進入無限迴圈,在迴圈中我們等待隊列中一旦出現新的消息,我們就會將其消費掉然後輸出消息的內容。
直到用戶輸入Ctrl+C
來停止程式。
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Putting it all together
send.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
把python代碼文件放到伺服器上,準備開始測試。
首先打開第一個終端,運行消費者程式receive.py
,它會占用前端一直運行下去,一旦有消息就會列印,直到來自用戶的終止指令Ctrl+C
[root@rabbitmq-01 code]# python receive.py
[*] Waiting for messages. To exit press CTRL+C
打開第二個終端,運行生產者程式send.py
,每運行一次它都會向隊列hello
發送一條消息Hello World!
同時輸出到終端,隨後它就會退出,不會占用前端資源。
[root@rabbitmq-01 code]# python send.py
[x] Sent 'Hello World!'
此時我們回到消費者終端,它會按照程式中說的消費掉隊列中的消息並將其輸出。
[root@rabbitmq-01 code]# python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
send.py
可以反覆執行。
[root@rabbitmq-01 code]# python send.py
[x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py
[x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py
[x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py
[x] Sent 'Hello World!'
消費者程式都可以捕獲。
[root@rabbitmq-01 code]# python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
[x] Received 'Hello World!'
[x] Received 'Hello World!'
[x] Received 'Hello World!'
我們可以通過rabbitmqctl list_queues
來查看mq實例中當前存在的隊列以及隊列中的消息數量。
[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
因為消息在一瞬間就被消費掉了,所以我們看到的消息數量都會是0。
想要退出的話就是在消費中終端執行Ctrl+C
[x] Received 'Hello World!'
[x] Received 'Hello World!'
^CInterrupted
本片文章的內容就到此為止了,我們學會了RabbitMQ的基本概念(生產者、消費者和隊列)。