本博客代碼運行環境 什麼是MQ 本教程pdf及代碼下載地址:代碼:https://download.csdn.net/download/zpcandzhj/10585077教程:https://download.csdn.net/download/zpcandzhj/10585092 RabbitM ...
本博客代碼運行環境
ErLang: ErLang_X64_22 version RabbitMQ: RabbitMQ_Server_3.7.15 version python : Python 3.7.1rc1 version pip : pip 19.1.1 version pika : pika 1.0.1 version
什麼是MQ
消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。 其主要用途:不同進程Process/線程Thread之間通信。 為什麼會產生消息隊列?有幾個原因: 不同進程(process)之間傳遞消息時,兩個進程之間耦合程度過高,改動一個進程,引發必須修改另一個進程,為了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),所有兩進程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨修改某一個進程,不會影響另一個; 不同進程(process)之間傳遞消息時,為了實現標準化,將消息的格式規範化了,並且,某一個進程接受的消息太多,一下子無法處理完,並且也有先後順序,必須對收到的消息進行排隊,因此誕生了事實上的消息隊列; 關於消息隊列的詳細介紹請參閱: 《Java帝國之消息隊列》 《一個故事告訴你什麼是消息隊列》 《到底什麼時候該使用MQ》 MQ框架非常之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿裡開源的RocketMQ。本文主要介紹RabbitMq。
本教程pdf及代碼下載地址:
代碼:https://download.csdn.net/download/zpcandzhj/10585077
教程:https://download.csdn.net/download/zpcandzhj/10585092
RabbitMQ
RabbitMQ簡介
1、MQ 為 Message Queue, 消息隊列是應用程式和應用程式之間的通信方法, 2、RabbitMQ 是一個開源的, 在AMQP 基礎上完整的,可復用的企業消息系統,消息中間件 , 消息隊列 3、支持主流的 OS, Linux, Windows, MacOX 等, 4、多種開發語言支持, java、Python、Ruby、.NET、PHP、C/C++、node.js 等 5、是專業級別的, 甩 python 的隊列好幾條街 6、開發語言: Erlang----面向併發的編程語言----- 愛立信公司, 可以做到 熱插拔, 局部載入, 不需要重啟整個服務
AMQP: 消息隊列的一個協議。
搭建RabbitMQ環境:windows下安裝
第一步 提供 Erlang 編程語言環境-----》安裝 Erlang
官網: http://www.erlang.org/download ,
安裝RabbitMQ
官網: https://www.rabbitmq.com/install-windows.html
安裝完成: 打開 cmd 命令行工具, cd 到 RabbitMQ 的安裝目錄下的 sbin/ 子目錄 中。 如圖:
1、啟動管理工具插件: rabbitmq-plugins enable rabbitmq_management
2、啟動 RabbitMQ 服務:net start rabbitmq
3、瀏覽器輸入地址: http://127.0.0.1:15672/
4、使用 預設賬號管理: guest/ guest , 能夠登陸 ,說明安裝成功
4.1、 添加 admin 用戶:
4.2、用戶角色:
1、超級管理員(administrator) 可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。 2、監控者(monitoring) 可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,記憶體使用情況,磁碟使用情況等) 3、策略制定者(policymaker) 可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。 4、普通管理者(management) 僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。 5、其他 無法登陸管理控制台,通常就是普通的生產者和消費者。
4.3、創建Virtual Hosts
選中Admin用戶,設置許可權:
看到許可權已加
4.4、 管理界面中的功能:
4.5、 管理工具中 查看隊列消息:
點擊上面的隊列名稱,查詢具體的隊列中的信息:
使用 rabbitMQ 命令 添加用戶並設置許可權的步驟:
1. 創建用戶: rabbitmqctl add_user name password 2. 設置用戶角色: rabbitmqctl set_user_tags name administrator 3. 遠程連接需設置用戶許可權, 代表允許從外面訪問: rabbitmqctl set_permissions -p / name ".*" ".*" ".*" 解析: set_permissions [-p vhost] {user} {conf} {write} {read}
RabbitMQ 常用命令:
1 1、rabbitmq 管理器插件的啟動和關閉: 2 **啟動監控管理器:rabbitmq-plugins enable rabbitmq_management 3 關閉監控管理器:rabbitmq-plugins disable rabbitmq_management 4 2、服務的啟動與關閉: 5 **啟動rabbitmq:rabbitmq-service start 6 關閉rabbitmq:rabbitmq-service stop 7 **使用 windows 命令: net start rabbitmq 8 net stop rabbitmq 9 3、rabbitmq伺服器的啟動和關閉: 10 前臺啟動: rabbitmq-server start 11 後臺啟動: rabbitmq-server -detached 12 前臺停止:rabbitmqctl stop 13 查看 RabbitMQ 的狀態: rabbitmqctl status 14 4、rabbitmq 應用管理: 15 關閉應用:rabbitmqctl stop_app 16 啟動應用:rabbitmqctl start_app 17 5、用戶管理: 18 **添加用戶: rabbitmqctl add_user username password 19 列出所有用戶: rabbitmqctl list_users 20 刪除用戶: rabbitmqctl delete_user username 21 **修改用戶密碼: rabbitmqctl change_password username 22 newpassword 23 6、角色管理: 24 **分配角色:rabbitmqctl set_user_tags username administrator 25 26 角色說明 27 none 最小許可權角色 28 management 管理員角色 29 policymaker 決策者 30 monitoring 監控 31 administrator 超級管理員 32 7. 許可權管理: 33 清除用戶許可權: rabbitmqctl clear_permissions -p vhostpath user 34 **設置用戶許可權: rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*" 35 8、虛擬主機管理: 36 列出所有虛擬主機: rabbitmqctl list_vhosts 37 創建虛擬主機: rabbitmqctl list_vhost vhostpath 38 刪除虛擬主機: rabbitmqctl delete_vhost vhostpath 39 列出虛擬主機所有許可權:rabbitmqctl list_permissions -p vhostpath 40 9、隊列管理: 41 **查看所有的隊列:rabbitmqctl list_queues 42 清除所有的隊列:rabbitmqctl reset 43 查看所有綁定: rabbitmqctl list_bindings 44 查看所有通道: rabbitmqctl list_channels 45 查看所有連接: rabbitmqctl list_connections 46 列出所有消費者: rabbitmqctl list_consumers 47 **列出所有交換機: rabbitmqctl list_exchanges
Python 操作 RabbitMQ 之深入淺出
此博客代碼托管地址: https://github.com/SuoSuo-Rocky/RabbitMQ-FullStack
安裝 rabbitMQ module
pip install pika or easy_install pika or 源碼 : https://pypi.python.org/pypi/pika
實現最簡單的隊列通信
send端
1 # 發送端, 消費者 2 import pika 3 4 credentials = pika.PlainCredentials('shiwei', 'shiwei666666') 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials)) 6 7 # 在連接之上創建一個 rabbit 協議的通道 8 channel = connection.channel() 9 10 # 在通道中 聲明 一個 queue 11 channel.queue_declare(queue='hello') 12 13 # 一個消息永遠不能直接發送到隊列,它總是需要經過一個交換 14 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 15 channel.basic_publish(exchange='', # 交換機 16 routing_key='hello', # 路由鍵,寫明將消息發往哪個隊列,本例是將消息發往隊列hello 17 body='Hello World!') # 生產者要發送的消息 內容 18 print(" [x] Sent 'Hello World!'") 19 connection.close() # 當生產者發送完消息後,可選擇關閉連接
receive端
import pika import time credentials = pika.PlainCredentials('shiwei', 'shiwei666666') connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. # 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行 channel.queue_declare(queue='hello') # 定義一個回調函數,用來接收生產者發送的消息 def callback(ch, method, properties, body): print("received msg...start processing....",body) time.sleep(5) print(" [x] msg process done....",body) channel.basic_consume(on_message_callback=callback, # 定義一個回調函數,用來接收生產者發送的消息 auto_ack=True, queue='hello', # 指定取消息的隊列名 ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #開始迴圈取消息
實現功能
(1)rabbitmq迴圈調度公平分發,將消息迴圈發送給不同的消費者,如:消息1,3,5發送給消費者1;消息2,4,6發送給消費者2。 (2)消息確認機制,為了確保一個消息不會丟失,RabbitMQ支持消息的確認 , 一個 ack(acknowlegement) 是從消費者端發送一個確認去告訴RabbitMQ 消息已經接收了、處理了,RabbitMQ可以釋放並刪除掉了。如果一個消費者死掉了(channel關閉、connection關閉、或者TCP連接斷開了)而沒有發送ack,RabbitMQ 就會認為這個消息沒有被消費者處理,並會重新發送到生產者的隊列里,如果同時有另外一個消費者線上,rabbitmq將會將消息很快轉發到另外一個消費者中。 那樣的話你就能確保雖然一個消費者死掉,但消息不會丟失。 這個是沒有超時的,當消費方(consumer)死掉後RabbitMQ會重新轉發消息,即使處理這個消息需要很長很長時間也沒有問題。消息的 acknowlegments 預設是打開的,在前面的例子中關閉了: auto_ack = True . 現在刪除這個標識 然後 發送一個 acknowledgment。
消息持久化
消息持久化,將消息寫入硬碟中。 註意: (1)、RabbitMQ不允許你重新定義一個已經存在、但屬性不同的queue, 否則報錯 (2)、標記消息為持久化並不能完全保證消息不會丟失,儘管已經告訴RabbitMQ將消息保存到磁碟,但 RabbitMQ接收到的消息在還沒有保存的時候,仍然有一個短暫的時間視窗。RabbitMQ不會對每個消息都執行同步 - -- 可能只是保存到緩存cache還沒有寫入到磁碟中。因此這個持久化保證並不是很強,但這比我們簡單的任務queue要 好很多,如果想要很強的持久化保證,可以使用 publisher confirms。 公平調度: 在一個消費者未處理完一個消息之前不要分發新的消息給它,而是將這個新消息分發給另一個不是很忙的消費 者進行處理。為瞭解決這個問題我們可以在消費者代碼中使用 channel.basic.qos (prefetch_count = 1 ),將消費者設置為公平調度
生產者
import pika import sys username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) chann = conn.channel() # 源碼: """ # def queue_declare(self, # channel.queueDeclare 用來創建隊列,有5個參數: # queue, # String queue, 隊列名; # passive=False, # # durable=False, # boolean durable, 該隊列是否需要持久化 # exclusive=False, # boolean exclusive,該隊列是否為該通道獨占的(其他通道是否可以消費該隊列) # auto_delete=False, # boolean autoDelete,該隊列不再使用的時候,是否讓RabbitMQ伺服器自動刪除掉; # arguments=None) """ chann.queue_declare(queue='test_tags', # 聲明 隊列, 不可與 已存在的 隊列重名 , 否則 報錯 durable=True, # 設置隊列 持久化 , 報 : ChannelClosedByBroker: 406 , 錯誤, passive:是屈服的意思,將passive設為True,問題解決。 # passive= True, ) message = "My name is shiwei" chann.basic_publish(exchange='', routing_key='test_tags', # 表明 要將 消息 發送到哪個隊列 body = message, properties = pika.BasicProperties(delivery_mode = 2) # 設置消息持久化, 將消息的屬性設置為 2 ,表示消息持久化 ) print('[Publisher] Send %s' % message) conn.close()
消費者
import pika import time username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) chann = conn.channel() chann.queue_declare(queue='test_tags', # 聲明 隊列, 不可與 已存在的 隊列重名 , 否則 報錯 durable=True, # 設置隊列 持久化 , # passive=True, # 是否檢查當前隊列 是否存在 , True 表示 當前聲明隊列 為 存在 的, ) # 定義 接受消息 的 回調函數 def callback(ch,method, properties, body): print(" [消費者] Received %r" % body) time.sleep(3) print(" [消費者] Done") # 手動 確認 在接收到 消息後 給 rabbitmq 發送一個 確認 ACK, 返回 消息標識符 ch.basic_ack(delivery_tag=method.delivery_tag) """ def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None): """ chann.basic.qos (prefetch_count = 1 ) # 註意 源碼中的 位置參數的位置 chann.basic_consume(queue='test_tags', on_message_callback = callback, # 是否 需要 自動 確認, 若為 False, 則需要在 消息回調函數中手動確認, auto_ack = False, # 預設是 False ) chann.start_consuming() # 開始 迴圈 接受消息
發佈-訂閱_廣播:一對多-------交換機
exchange:交換機。生產者不是將消息發送給隊列,而是將消息發送給交換機,由交換機決定將消息發送給哪個隊列。所以 exchange必須準確知道消息是要送到哪個隊列,還是要被丟棄。因此要在exchange中給exchange定義規則,所有的規 則都是在exchange的類型中定義的。 exchange有4個類型: fanout : 所有bind到此exchange的queue都可以接收消息 direct :通過routingKey和exchange決定的那個唯一的queue可以接收消息 topic :所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息 表達式符號說明:#代表一個或多個字元,*代表任何字元 例:#.a會匹配a.a,aa.a,aaa.a等 *.a會匹配a.a,b.a,c.a等 註:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout headers :通過headers 來決定把消息發給哪些queue 之前,我們並沒有講過exchange,但是我們仍然可以將消息發送到隊列中。這是因為我們用的是預設exchange.也就是 說之前寫的:exchange='',空字元串表示預設的exchange。
exchange_type=fanout
廣播類型,生產者將消息發送給所有消費者,如果某個消費者沒有收到當前消息,就再也收不到了(消費者就像收音機) 生產者:(可以用作日誌收集系統) 開啟多個消費者後,會同時從生產者接收相同的消息
消息publisher
import pika username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) #在連接上創建一個頻道 channel = conn.channel() #創建一個fanout(廣播)類型的交換機exchange,名字為logs。 channel.exchange_declare(exchange="logs", exchange_type="fanout") message = "info: Hello World!" channel.basic_publish(exchange='logs',# 指定交換機exchange為logs,這裡只需要指定將消息發給交換機logs就可以了,不需要指定隊列,因為生產者消息是發送給交換機的。 routing_key='', # 在fanout類型中,綁定關鍵字routing_key必須忽略,寫空即可 body=message) print(" [x] Sent %r" % message) conn.close()
消息subscriber
import pika import sys username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) #在連接上創建一個頻道 channel = conn.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") # 參數名改變了,以前版本是 type result = channel.queue_declare(exclusive=True, # 創建隨機隊列,exclusive=True(唯一性)當消費者與rabbitmq斷開連接時,這個隊列將自動刪除。 queue='',) queue_name = result.method.queue # 分配隨機隊列的名字。 channel.queue_bind(exchange='logs',# 將交換機、隊列綁定在一起, queue=queue_name,) def callback(ch, method, properties, body): # 定義回調函數,接收消息 print(" [消費者] %r:%r" % (method.routing_key, body)) channel.basic_consume(queue=queue_name, on_message_callback = callback, auto_ack=True) # 消費者接收消息後,不給rabbimq回執確認。 channel.start_consuming() # 迴圈等待消息接收。
exchange_type=direct
RabbitMQ還支持根據關鍵字發送,無需聲明隊列,即:發佈時給隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。 例如: 根據 日誌級別,info, warning, error, success,本例即是。 註意: *****本例 需從命令行啟動,給定參數-------》隊列的綁定關鍵字
消息publisher
import pika import sys username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) #在連接上創建一個頻道 channel = conn.channel() #創建一個交換機並聲明 direct 的類型為:關鍵字類型,表示該交換機會根據消息中不同的關鍵字將消息發送給不同的隊列 channel.exchange_declare(exchange="direct_logs", exchange_type="direct") severity = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or "Hello World!" channel.basic_publish(exchange='direct_logs', # 指明用於發佈消息的交換機、關鍵字 routing_key=severity, # 綁定關鍵字,即將message與關鍵字info綁定,明確將消息發送到哪個關鍵字的隊列中。 body=message) print(" [x] Sent %r" % message) conn.close()
消息subscriber
import pika import sys username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) # 在連接上創建一個頻道 channel = conn.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") # 參數 名改變了, 以前是 type result = channel.queue_declare(exclusive=True, # 創建隨機隊列,當消費者與rabbitmq斷開連接時,這個隊列將自動刪除。 queue='',) queue_name = result.method.queue # 分配隨機隊列的名字。 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: # 迴圈 隊列, 使其與交換機綁定在一起, channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity,) def callback(ch, method, properties, body): # 定義回調函數,接收消息 print(" [消費者] %r:%r" % (method.routing_key, body)) channel.basic_consume(queue=queue_name, on_message_callback = callback, auto_ack=True,) # 消費者接收消息後,不給rabbimq回執確認。 channel.start_consuming() # 迴圈等待消息接收。
exchange_type=topic---> 模糊匹配類型。比較常用
發送到一個 topics交換機的消息,它的 routing_key不能是任意的 -- 它的routing_key必須是一個用小數點分割的 單詞列表。 這個字元可以是任何單詞,但是通常是一些指定意義的字元。比如: “stock.usd.nyse","nyse.vmw","quick.orange.rabbit". 這裡可以是你想要路由鍵的任意字元。最高限制 為255位元組。 生產者與消費者的routing_key必須在同一個表單中。 Topic交換的背後的邏輯類似直接交換(direct) -- 包含特定關 鍵字的消息將會分發到所有匹配的關鍵字隊列中。然後有兩個重要的特殊情況: 綁定鍵值: > * (星) 可代替一個單詞 > # (井) 可代替0個或多個單詞 註意: *****本例 需從命令行啟動,給定參數-------》隊列的綁定關鍵字
消息publisher
import pika import sys username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) channel = conn.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) conn.close()
消息subscriber
import pika import sys username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) channel = conn.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True, queue="",) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(on_message_callback = callback, queue=queue_name, auto_ack=True) channel.start_consuming()
To receive all the logs run: python receive_logs_topic.py "#" To receive all logs from the facility "kern": python receive_logs_topic.py "kern.*" Or if you want to hear only about "critical" logs: python receive_logs_topic.py "*.critical" You can create multiple bindings: python receive_logs_topic.py "kern.*" "*.critical" And to emit a log with a routing key "kern.critical" type: python emit_log_topic.py "kern.critical" "A critical kernel error"
exchange_type=topic----例 二 :
生產者
1 import pika 2 import sys 3 4 username = "shiwei" 5 pwd = 'shiwei666666' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 8 # 創建連接 9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) 10 11 channel = conn.channel() 12 channel.exchange_declare(exchange='topic_logs', 13 exchange_type='topic') # 創建模糊匹配類型的exchange。。 14 15 routing_key = '[warn].kern' # 這裡關鍵字必須為點號隔開的單詞,以便於消費者進行匹配。引申:這裡可以做一個判斷,判斷產生的日誌是什麼級別,然後產生對應的routing_key,使程式可以發送多種級別的日誌 16 message = 'Hello World!' 17 channel.basic_publish(exchange='topic_logs',#將交換機、關鍵字、消息進行綁定 18 routing_key=routing_key, # 綁定關鍵字,將隊列變成[warn]日誌的專屬隊列 19 body=message) 20 print(" [x] Sent %r:%r" % (routing_key, message)) 21 conn.close()
消費者
1 import pika 2 import sys 3 4 username = "shiwei" 5 pwd = 'shiwei666666' 6 user_pwd = pika.PlainCredentials(username, pwd) 7 8 # 創建連接 9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) 10 11 channel = conn.channel() 12 13 channel.exchange_declare(exchange='topic_logs', 14 exchange_type='topic') # 聲明exchange的類型為模糊匹配。 15 16 result = channel.queue_declare(exclusive=True, 17 queue="",) # 創建隨機一個隊列當消費者退出的時候,該隊列被刪除。 18 queue_name = result.method.queue # 創建一個隨機隊列名字。 19 20 # 綁定鍵。‘#’匹配所有字元,‘*’匹配一個單詞。這裡列表中可以為一個或多個條件,能通過列表中字元匹配到的消息,消費者都可以取到 21 binding_keys = ['[warn].*', 'info.*'] 22 if not binding_keys: 23 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 24 sys.exit(1) 25 26 # 通過迴圈綁定多個“交換機-隊列-關鍵字”,只要消費者在rabbitmq中能匹配到與關鍵字相應的隊列,就從那個隊列里取消息 27 for binding_key in binding_keys: 28 channel.queue_bind(exchange='topic_logs', 29 queue=queue_name, 30 routing_key=binding_key) 31 32 print(' [*] Waiting for logs. To exit press CTRL+C') 33 34 35 def callback(ch, method, properties, body): 36 print(" [x] %r:%r" % (method.routing_key, body)) 37 38 39 channel.basic_consume(on_message_callback=callback, 40 queue=queue_name, 41 auto_ack=True) # 不給rabbitmq發送確認 42 43 channel.start_consuming() # 迴圈接收消息
遠程過程調用(RPC)Remote procedure call
消息屬性 AMQP協議在一個消息中預先定義了一個包含14個屬性的集合。大部分屬性很少用到,以下幾種除外: > delivery_mode: 標記一個消息為持久的(值為2)或者 瞬時的(其它值), 你需要記住這個屬性(在第二課時用到過) > content_type : 用來描述 MIME 類型的編碼 ,比如我們經常使用的 JSON 編碼,設置這個屬性就非常好實現: application/json > reply_to:reply_to沒有特別的意義,只是一個普通的變數名,只是它通常用來命名一個 callback 隊列 > correlation_id : 用來關聯RPC的請求與應答。關聯id的作用:當在一個隊列中接收了一個返回,我們並不清楚這個結果時屬於哪個請求的,這樣當correlation_id屬性使用後,我們為每個請求設置一個唯一值,這個值就是關聯id。這樣,請求會有一個關聯id,該請求的返回結果也有一個相同的關聯id。然後當我們從callback隊列中接收到一個消息後,我們查看一下這個關聯,基於這個我們就能將請求和返回進行匹配。如果我們看到一個未知的correlation_id值,我們可以直接丟棄這個消息 -- 它是不屬於我們的請求。 RPC執行過程: > 當客戶端啟動後,它創建一個匿名的唯一的回調隊列 > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識) > 請求發送到 rpc_queue隊列 > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to欄位中的隊列 > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用
RPC Running Detail: > 當客戶端啟動後,它創建一個匿名的唯一的回調隊列 > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識) > 請求發送到 rpc_queue隊列 > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to欄位中的隊列 > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用
RPC Client
import pika import uuid class FibonacciRpcClient(object): def __init__(self): username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) self.channel = self.conn.channel() result = self.channel.queue_declare(exclusive=True, queue= '') # 隨機生成 一個 queue , 用與 Server 發送消息 self.callback_queue = result.method.queue self.channel.basic_consume(on_message_callback = self.on_response, auto_ack = True, # 準備 發送 消息 queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id,), body=str(n)) while self.response is None: self.conn.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(7) print(" [.] Got %r" % response)
Client Running Detail: > (11) 我們建立一個連接,通道並定義一個專門的’callback‘隊列用來接收回覆 > (19) 我們訂閱了“callback”隊列,因此我們能夠接收 RPC 的返回結果 > (21) ’on_response' 在每個返回中執行的回調是一個簡單的job, 對每個返回消息將檢查correlation_id是否是我們需要查找的那個ID,如果是,將保存結果到 self.response 並終端consuming迴圈 > (25) 下一步,我們定義我們的main方法 - 執行實際的RPC請求 > (27) 在這方法中,首先我們生產一個唯一的 correlatin_id 號並保存 -- 'on_response"回調函數將用著號碼來匹配發送和接收的消息值 > (28) 下一步,發佈請求信息,使用兩個屬性: reply_to 和 correlation_id > (34) 這一步我們可以坐等結果的返回 > (36) 最後我們返回結果給用戶
RPC Server
import pika username = "shiwei" pwd = 'shiwei666666' user_pwd = pika.PlainCredentials(username, pwd) # 創建連接 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) channel = conn.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback = on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()
Server Running Detail: > 當客戶端啟動後,它創建一個匿名的唯一的回調隊列 > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識) > 請求發送到 rpc_queue隊列 > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to欄位中的隊列 > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用
RPC Demo02
處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。其實在這裡接收端、發送端 的概念已經比較模糊了,因為發送端也同樣要接收消息,接收端同樣也要發送消息,所以這裡筆者使用另外的示例來演示這一過程。 示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個自然數N發送給計算節點,計算節點將N值加1後,返回給控 制中心。這裡用center.py模擬控制中心,compute.py模擬計算節點。
Client
1 import pika 2 3 class Center(object): 4 def __init__(self): 5 username = "shiwei" 6 pwd = 'shiwei666666' 7 user_pwd = pika.PlainCredentials(username, pwd) 8 9 # 創建連接 10 self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) 11 12 self.channel = self.conn.channel() 13 # 定義接收返回消息的隊列 14 result = self.channel.queue_declare(exclusive=True,queue="",) 15 self.callback_queue = result.method.queue 16 17 self.channel.basic_consume(on_message_callback=self.on_response, 18 auto_ack=True, 19 queue=self.callback_queue) 20 21 # 定義接收到返回消息的處理方法 22 def on_response(self, ch, method, props, body): 23 self.response = body 24 25 def request(self, n): 26 self.response = None 27 # 發送計算請求,並聲明返回隊列 28 self.channel.basic_publish(exchange='', 29 routing_key='compute_queue', 30 properties=pika.BasicProperties( 31 reply_to=self.callback_queue, 32 ), 33 body=str(n)) 34 # 接收返回的數據 35 while self.response is None: 36 self.conn.process_data_events() 37 return int(self.response) 38 39 center = Center() 40 41 print(" [x] Requesting increase(30)") 42 response = center.request(30) 43 print(" [.] Got %r" % (response,))Client
Server
1 import pika 2 3 username = "shiwei" 4 pwd = 'shiwei666666' 5 user_pwd = pika.PlainCredentials(username, pwd) 6 # 創建連接 7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd)) 8 channel = conn.channel() 9 10 print(' [*] Waiting for n') 11 channel.queue_declare(queue='compute_queue') 12 13 # 將n值加1 14 def increase(n): 15 return n + 1 16 17 # 定義接收到消息的處理方法 18 def request(ch, method, properties, body): 19 print(" [.] increase(%s)" % (body,)) 20 21 response = increase(int(body)) 22 23 # 將計算結果發送回控制中心 24 ch.basic_publish(exchange='', 25 routing_key=properties.reply_to, 26 body=str(response)) 27 ch.basic_ack(delivery_tag=method.delivery_tag) 28 29 channel.basic_qos(prefetch_count=1) 30 channel.basic_consume(on_message_callback=request, 31 queue='compute_queue') 32 33 channel.start_consuming()Server
參考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html