RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通信,而無需專用連接來鏈 ...
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程式之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程式通過隊列來通信。隊列的使用除去了接收和發送應用程式同時執行的要求。RabbitMQ可以,多個程式同時使用RabbitMQ ,但是必須隊列名稱不一樣。採用erlang語言,屬於愛立信公司開發的。
術語(Jargon)
- P(Producing):製造和發送信息的一方。
- Queue:消息隊列。
- C(Consuming):接收消息的一方。
1. 安裝
Ubuntu 上安裝
- 添加源、新增公鑰(不加會有警告)、更新源,安裝:
rabbitmq-server
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
- 安裝完成後還要配置下:
# 在 rabbitmq 中添加用戶
hj@hj:~$ sudo rabbitmqctl add_user username password
Creating user "hj" # 這為設置成功後的提示,同下
# 將用戶設置為管理員(只有管理員才能遠程登錄)
hj@hj:~$ sudo rabbitmqctl set_user_tags username administrator
Setting tags for user "hj" to [administrator]
# 為用戶設置讀寫許可權
hj@hj:~$ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
Setting permissions for user "username" in vhost "/"
Windows 上安裝
- 安裝
pika
pip3 install -i http://pypi.douban.com/simple/ pika --trusted-host pypi.douban.com
- RabbitMQ 是建立在 Erlang OTP 平臺上,所有需要下載 Erlang 和 RabbitMQ,官網上下載安裝
Erlang
和RabbitMQ
- Erlang:http://www.erlang.org/downloads
- RabbitMQ:https://www.rabbitmq.com/install-windows.html
- 將 Erlang 添加到系統環境變數中
新建一個 ERLANG_HOME,值為 ERlang 的安裝路徑(有些安裝時會自動添加):
將 ERLANG_HOME 添加到 path 中(這裡以 win10 平臺為例,其他平臺可能會不一樣):
打開 CMD
以管理員身份證運行,輸入 erl
檢查 ERlang 是否安裝成功:
C:\Windows\system32>erl
Eshell V10.3 (abort with ^G) # 版本
1> # 標識符
- rabbitmq 需要開啟後臺管理插件
rabbitmq management
2. 隊列通信
2.1 簡單示例
下麵我們來使用 RabbitMQ
來實現一個簡單的消息收發:
- 發送端:一臺 Windows 機器
- 接收端:一臺 Ubuntu 虛擬機
消息不能直接發送到隊列,而是需要經過 exchange 轉發器轉發,只有與轉發器綁定了的隊列,才能收到消息。在這裡我們假設不經過 exchange 轉發:
- 發送端:
import pika
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.xxx', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
# 聲明queue
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 消息不能直接發送到隊列,而是需要經過 exchange 轉發器轉發,只有與轉發器綁定了的隊列,才能收到消息
channel.basic_publish(exchange='',
routing_key='hello',
body=b'Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
首先需要輸入上面第一章中已經註冊的 rabbitmq
賬戶,然後再連接遠程端。
其次再聲明瞭一個隊列 queue
,名稱為 hello
,在這裡 exchange 為空,發送的內容 body
必須是 bytes
類型。
- 接收端:
接收端也必須指定隊列名稱:
import pika
import time
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(20)
print(" [x] msg process done %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
運行結果如下:
2.2 消息持久化
我們已經知道即使消費者死亡,消息(隊列)也不會丟失(在禁用 no_ack=True的前提下,現在是 auto_ack=True)
但是如果 RabbitMQ
伺服器停止,我們的任務一樣會丟失,當 RabbitMQ
退出或奔潰時,將會忘記隊列和消息,除非我們告訴它不要這樣,那麼我們就要將隊列和消息標記為持久。
- 確保
RabbitMQ
永遠不會丟失我們的隊列,需要設置durable=True
:
# 發送端,即消息製造者
channel.queue_declare(queue='task_queue', durable=True)
- 將消息標記為持久性:
# 發送端,即消息製造者
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent 使消息持久
)
設置好之後,發送端先發送一條消息,接收端先不要啟動。使用以下命令關閉啟動 rabbitmq
服務,觀察隊列和消息會不會真正丟失:
# 若命令運行失敗,可以嘗試使用 管理員模式 sudo
# 啟動rabbitmq
service rabbitmq-server start
# 停止rabbitmq
service rabbitmq-server stop
# 重啟rabbitmq
service rabbitmq-server restart
# 查看當前活動的隊列
rabbitmqctl list_queues
2.3 公平分發
所謂公平分發即一個生產者,多個消費者,類似於負載均衡。
下麵我將設置一個發送端,兩個接收端:
- 發送端:
import pika
import time
import sys
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
# 聲明queue
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(exchange='',
routing_key='task_queue',
body=bytes(message, encoding='utf-8'),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent 使消息持久
)
)
print(" [x] Sent %r" % message)
connection.close()
- 接收端:
import pika
import time
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
def callback(ch, method, properties, body):
print(" [x] Received %r" % body) # b'Hello World! 1557373639.5839057'
time.sleep(20)
print(" [x] Done")
print("method.delivery_tag", method.delivery_tag) # 1
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback, queue='task_queue')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
另外一個接收端代碼一致,在此省略,運行結果如下:
2.4 根據實際情況分發消息
事實上伺服器之間接收、處理消息的能力是不一樣的,受網路、配置等因素影響,因此公平分發消息就會導致以下問題出現:
- 配置高、網路好的伺服器處理消息能力強、快
- 配置一般、網路不好的伺服器有可能就會積壓很多未處理的消息
為此我們可以在接收端設置 prefetch_count=1
,如果前面還有消息未處理,就告訴發送端不要給我發消息,直至處理完畢前一條消息為止:
channel.basic_qos(prefetch_count=1) # 如果前面有消息沒處理完,就不要給我再發消息
3. 訂閱(廣播)
上面的例子基本上都是一對一發送和接收消息,如果想要將消息發送到所有隊列(queue)中,那麼就需要用到廣播了,而實現廣播的一個重要參數就是 exchange
—— 消息轉發器。
exchange 在定義時是有類型的,只有符合條件的才能接收消息,大致可分為以下幾類:
- fanout(全民廣播):凡是綁定 exchange 的隊列都可以接收到消息
- direct(組播):以組為單位接收消息,如:發送到某個組,那麼這個組裡的所有隊列都能接收,
routingKey
為關鍵字/組名 - topic(根據特征收發消息):所有符合
routingKey
綁定的隊列都可以接收消息
3.1 fanout 方式
所有綁定 exchange 的 queue 都能接收到消息。
應用場景:視頻直播
- 發送端:
import pika
import sys
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
# 指定 exchange 類型、名字
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=bytes(message, encoding='utf-8'))
print(" [x] Sent %r" % message)
connection.close()
- 接收端:
import pika
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 不指定queue名字, rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
# 最新源代碼需要執行 queue,如果為 '',則 if empty string, the broker will create a unique queue name
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
# result = <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-hRrQ-pwaT9u-32CcIokCxA'])>"])>
# queue_name = amq.gen-hRrQ-pwaT9u-32CcIokCxA
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback, queue=queue_name)
channel.start_consuming()
打開兩個終端,分別運行:
python3 fanout_send.py t1
python3 fanout_send.py t2
運行結果如下:
3.2 direct 方式
RabbitMQ
還可以根據關鍵字發送接收消息,隊列綁定關鍵字,發送端根據關鍵字發送到 exchange,exchange 再根據關鍵字判斷發給哪個隊列。
- 發送端:
import pika
import sys
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# python3 direct_send.py info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info' # 嚴重程度,級別, info
message = ' '.join(sys.argv[2:]) or 'Hello World!' # Hello World!
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=bytes(message, encoding='utf-8'))
print(" [x] Sent %r:%r" % (severity, message)) # [x] Sent 'info' : 'Hello World!'
connection.close()
- 接收端:
import pika
import sys
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
# python3 direct_recv.py info warning error
# python3 direct_recv.py info
# python3 direct_recv.py error
severities = sys.argv[1:] # ['direct_recv.py', 'info', 'warning', 'error']、['direct_recv.py', 'error']、['direct_recv.py', 'info']
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 迴圈綁定關鍵字
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(on_message_callback=callback, queue=queue_name)
channel.start_consuming()
接收端執行打開三個終端,分別執行:
python3 direct_recv.py info warning error
python3 direct_recv.py info
python3 direct_recv.py error
然後迴圈關鍵字,綁定隊列(queue),發送端執行相應關鍵字,接收端這邊就能根據關鍵字接收消息。
運行結果如下:
3.3 topic 方式
- 發送端:
import pika
import sys
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=bytes(message, encoding='utf-8'))
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
- 接收端:
import pika
import sys
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(on_message_callback=callback, queue=queue_name)
channel.start_consuming()
接收端開啟四個終端,發送端開啟一個:
# 接收端
python3 topic_recv.py *.django.* # 消息兩端可以是任意,中間只要是 django 即可
python3 topic_recv.py # # 可以接收任意消息
python3 topic_recv.py mysql.* # 以 mysql 開頭,結尾可以是任意
python3 topic_recv.py mysql.error.* # mysql.error 開頭,結尾任意
# 發送端
python3 topic_send.py mysql.error.info
python3 topic_send.py ss.django.123
python3 topic_send.py mysql.error err happend
python3 topic_send.py python.error test
運行結果如下:
總結
#
號能匹配任意消息,相當於廣播*
號也可以匹配任意,但是必須和其他一起使用
4. RPC(Remote Procedure Call)雙向傳輸
上面收發消息都是單向的,即一個發一個接收,接收的不能夠發送。而 RPC 是雙向的,既能夠發送也能接收。
應用場景:RPC 服務功能
- 發送端:
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
credentials = pika.PlainCredentials('username', 'password')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = self.connection.channel() # 建立 rabbit 協議通道
self.channel = self.connection.channel()
result = self.channel.queue_declare('', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(on_message_callback=self.on_response,
queue=self.callback_queue,
auto_ack=True) #準備接受命令結果
def on_response(self, ch, method, props, body):
""""callback方法"""
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) #唯一標識符
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
count = 0
while self.response is None:
self.connection.process_data_events() #檢查隊列里有沒有新消息,但不會阻塞
count += 1
print("check...", count)
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(5)
print(" [.] Got %r" % response)
- 接收端:
import pika
import time
credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.21.128', credentials=credentials))
channel = connection.channel() # 建立 rabbit 協議通道
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
運行結果如下:
5. 參考:
- RabbitMQ基本概念(二):windows下安裝
- ubuntu下使用apt-get一步步安裝rabbitmq
- Ubuntu上安裝和使用RabbitMQ
- RabbitMQ在Ubuntu 16.04下的安裝與配置
- RabbitMQ 入門
- Python併發編程-RabbitMQ消息隊列
- windows下 安裝 rabbitMQ 及操作常用命令
6. 常用命令
#創建用戶
rabbitmqctl add_user rabbitadmin 123456
rabbitmqctl set_user_tags rabbitadmin administrator
# 給用戶授權
rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*"
# 開啟插件管理頁面
rabbitmq-plugins enable rabbitmq_management
rabbitmq-server start # 啟動服務
rabbitmq-server stop # 關閉服務
rabbitmq-server restart # 重啟服務
rabbitmq-server status # 查看服務狀態
ps -ef|grep rabbitmq # 查看埠
rabbitmqctl list_queues # 查看隊列消息
./rabbitmqctl list_users # 查看用戶列表命令
rabbitmqctl delete_user Username # 刪除用戶命令
whereis rabbitmq #查看rabbitmq安裝目錄