RabbitMQ 全套

来源:https://www.cnblogs.com/shiwei1930/archive/2019/07/01/11116250.html
-Advertisement-
Play Games

本博客代碼運行環境 什麼是MQ 本教程pdf及代碼下載地址:代碼:https://download.csdn.net/download/zpcandzhj/10585077教程:https://download.csdn.net/download/zpcandzhj/10585092 RabbitM ...


本博客代碼運行環境

     ErLang:      ErLang_X64_22 version
     RabbitMQ:    RabbitMQ_Server_3.7.15 version
     python :     Python 3.7.1rc1   version
     pip :       pip 19.1.1  version
     pika :       pika 1.0.1 version

什麼是MQ

消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不同進程Process/線程Thread之間通信。
為什麼會產生消息隊列?有幾個原因:

不同進程(process)之間傳遞消息時,兩個進程之間耦合程度過高,改動一個進程,引發必須修改另一個進程,為了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),所有兩進程之間傳遞的消息,都必須通過消息隊列來傳遞,單獨修改某一個進程,不會影響另一個;

不同進程(process)之間傳遞消息時,為了實現標準化,將消息的格式規範化了,並且,某一個進程接受的消息太多,一下子無法處理完,並且也有先後順序,必須對收到的消息進行排隊,因此誕生了事實上的消息隊列;

關於消息隊列的詳細介紹請參閱:
《Java帝國之消息隊列》
《一個故事告訴你什麼是消息隊列》
《到底什麼時候該使用MQ》

MQ框架非常之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿裡開源的RocketMQ。本文主要介紹RabbitMq。

 本教程pdf及代碼下載地址
代碼:https://download.csdn.net/download/zpcandzhj/10585077
教程:https://download.csdn.net/download/zpcandzhj/10585092

 

RabbitMQ

RabbitMQ簡介

    
    1、MQ 為 Message Queue, 消息隊列是應用程式和應用程式之間的通信方法,
    2、RabbitMQ 是一個開源的, 在AMQP 基礎上完整的,可復用的企業消息系統,消息中間件 , 消息隊列
    3、支持主流的 OS, Linux, Windows, MacOX 等,
    4、多種開發語言支持, java、Python、Ruby、.NET、PHP、C/C++、node.js 等
    5、是專業級別的, 甩 python 的隊列好幾條街
    6、開發語言: Erlang----面向併發的編程語言----- 愛立信公司, 可以做到 熱插拔, 局部載入, 不需要重啟整個服務
 
   AMQP: 消息隊列的一個協議。

搭建RabbitMQ環境:windows下安裝

第一步 提供 Erlang 編程語言環境-----》安裝 Erlang

官網: http://www.erlang.org/download ,

安裝RabbitMQ

官網: https://www.rabbitmq.com/install-windows.html

安裝完成: 打開 cmd 命令行工具, cd 到 RabbitMQ 的安裝目錄下的 sbin/ 子目錄 中。 如圖:

1、啟動管理工具插件: rabbitmq-plugins enable rabbitmq_management

2、啟動 RabbitMQ 服務:net start rabbitmq

3、瀏覽器輸入地址: http://127.0.0.1:15672/

    4、使用 預設賬號管理: guest/ guest  , 能夠登陸 ,說明安裝成功

    4.1、 添加 admin 用戶:

     4.2、用戶角色: 

   1、超級管理員(administrator)
      可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
   2、監控者(monitoring)
      可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,記憶體使用情況,磁碟使用情況等)
   3、策略制定者(policymaker)
     可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
   4、普通管理者(management)
      僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
   5、其他
      無法登陸管理控制台,通常就是普通的生產者和消費者。

 

     4.3、創建Virtual Hosts

virtual hosts

        選中Admin用戶,設置許可權:

permissions

     看到許可權已加

look

4.4、 管理界面中的功能: 

vir

shuo

   4.5、 管理工具中 查看隊列消息:

    點擊上面的隊列名稱,查詢具體的隊列中的信息:

 

使用 rabbitMQ 命令 添加用戶並設置許可權的步驟:

   1. 創建用戶:      rabbitmqctl add_user name password
   2. 設置用戶角色:   rabbitmqctl set_user_tags name administrator
   3. 遠程連接需設置用戶許可權, 代表允許從外面訪問: 
      rabbitmqctl set_permissions -p / name ".*" ".*" ".*"
    解析:          set_permissions [-p vhost] {user} {conf} {write} {read}

   RabbitMQ 常用命令: 

 1 1、rabbitmq 管理器插件的啟動和關閉:
 2     **啟動監控管理器:rabbitmq-plugins enable rabbitmq_management
 3     關閉監控管理器:rabbitmq-plugins disable rabbitmq_management
 4 2、服務的啟動與關閉:
 5      **啟動rabbitmq:rabbitmq-service start
 6      關閉rabbitmq:rabbitmq-service stop
 7      **使用 windows 命令: net start rabbitmq
 8                                  net stop rabbitmq
 9 3、rabbitmq伺服器的啟動和關閉:
10     前臺啟動: rabbitmq-server start
11     後臺啟動: rabbitmq-server -detached
12     前臺停止:rabbitmqctl stop
13     查看 RabbitMQ 的狀態: rabbitmqctl status
14 4、rabbitmq 應用管理: 
15     關閉應用:rabbitmqctl stop_app
16     啟動應用:rabbitmqctl start_app
17 5、用戶管理: 
18     **添加用戶: rabbitmqctl add_user username password
19     列出所有用戶: rabbitmqctl list_users
20     刪除用戶: rabbitmqctl delete_user username 
21     **修改用戶密碼: rabbitmqctl change_password username 
22  newpassword
23 6、角色管理: 
24     **分配角色:rabbitmqctl set_user_tags username administrator
25 
26      角色說明
27              none 最小許可權角色
28              management 管理員角色
29              policymaker 決策者
30              monitoring 監控
31              administrator 超級管理員
32 7. 許可權管理: 
33    清除用戶許可權: rabbitmqctl clear_permissions -p vhostpath user
34    **設置用戶許可權: rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
35 8、虛擬主機管理:
36     列出所有虛擬主機: rabbitmqctl list_vhosts
37     創建虛擬主機:      rabbitmqctl list_vhost vhostpath
38     刪除虛擬主機:     rabbitmqctl delete_vhost vhostpath
39     列出虛擬主機所有許可權:rabbitmqctl list_permissions -p vhostpath
40 9、隊列管理:
41    **查看所有的隊列:rabbitmqctl list_queues
42    清除所有的隊列:rabbitmqctl reset
43    查看所有綁定:    rabbitmqctl list_bindings
44    查看所有通道:    rabbitmqctl list_channels
45    查看所有連接:    rabbitmqctl list_connections
46    列出所有消費者: rabbitmqctl list_consumers
47    **列出所有交換機: rabbitmqctl list_exchanges

 

Python 操作 RabbitMQ 之深入淺出

        此博客代碼托管地址:  https://github.com/SuoSuo-Rocky/RabbitMQ-FullStack

安裝 rabbitMQ module

 
 pip install pika 
 or 
 easy_install pika
 or 
 源碼 : https://pypi.python.org/pypi/pika 

實現最簡單的隊列通信

 

send端

 1 #  發送端, 消費者
 2 import pika
 3 
 4 credentials = pika.PlainCredentials('shiwei', 'shiwei666666')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
 6 
 7 # 在連接之上創建一個 rabbit 協議的通道
 8 channel = connection.channel()
 9 
10 # 在通道中 聲明 一個 queue
11 channel.queue_declare(queue='hello')
12 
13 # 一個消息永遠不能直接發送到隊列,它總是需要經過一個交換
14 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
15 channel.basic_publish(exchange='',  # 交換機
16                       routing_key='hello',   # 路由鍵,寫明將消息發往哪個隊列,本例是將消息發往隊列hello
17                       body='Hello World!')   # 生產者要發送的消息 內容
18 print(" [x] Sent 'Hello World!'")
19 connection.close()  # 當生產者發送完消息後,可選擇關閉連接

receive端

 

import pika
import time
credentials = pika.PlainCredentials('shiwei', 'shiwei666666')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
# 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另一方能正常運行
channel.queue_declare(queue='hello')

# 定義一個回調函數,用來接收生產者發送的消息
def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(5)
    print(" [x] msg process done....",body)

channel.basic_consume(on_message_callback=callback, # 定義一個回調函數,用來接收生產者發送的消息
                      auto_ack=True,
                      queue='hello',            # 指定取消息的隊列名
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()      #開始迴圈取消息

實現功能

   (1)rabbitmq迴圈調度公平分發,將消息迴圈發送給不同的消費者,如:消息1,3,5發送給消費者1;消息2,4,6發送給消費者2。
   (2)消息確認機制,為了確保一個消息不會丟失,RabbitMQ支持消息的確認 , 一個 ack(acknowlegement) 是從消費者端發送一個確認去告訴RabbitMQ 消息已經接收了、處理了,RabbitMQ可以釋放並刪除掉了。如果一個消費者死掉了(channel關閉、connection關閉、或者TCP連接斷開了)而沒有發送ack,RabbitMQ 就會認為這個消息沒有被消費者處理,並會重新發送到生產者的隊列里,如果同時有另外一個消費者線上,rabbitmq將會將消息很快轉發到另外一個消費者中。 那樣的話你就能確保雖然一個消費者死掉,但消息不會丟失。
     這個是沒有超時的,當消費方(consumer)死掉後RabbitMQ會重新轉發消息,即使處理這個消息需要很長很長時間也沒有問題。消息的 acknowlegments 預設是打開的,在前面的例子中關閉了: auto_ack = True . 現在刪除這個標識 然後 發送一個 acknowledgment。

消息持久化

 消息持久化,將消息寫入硬碟中。
       註意: (1)、RabbitMQ不允許你重新定義一個已經存在、但屬性不同的queue, 否則報錯
            (2)、標記消息為持久化並不能完全保證消息不會丟失,儘管已經告訴RabbitMQ將消息保存到磁碟,但 
       RabbitMQ接收到的消息在還沒有保存的時候,仍然有一個短暫的時間視窗。RabbitMQ不會對每個消息都執行同步 - 
      -- 可能只是保存到緩存cache還沒有寫入到磁碟中。因此這個持久化保證並不是很強,但這比我們簡單的任務queue要 
      好很多,如果想要很強的持久化保證,可以使用 publisher confirms。
     公平調度: 在一個消費者未處理完一個消息之前不要分發新的消息給它,而是將這個新消息分發給另一個不是很忙的消費 
               者進行處理。為瞭解決這個問題我們可以在消費者代碼中使用 channel.basic.qos (prefetch_count = 1 ),將消費者設置為公平調度

生產者

  

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
chann = conn.channel()

# 源碼:
"""
#     def queue_declare(self,            # channel.queueDeclare 用來創建隊列,有5個參數:
#                       queue,           # String queue, 隊列名;
#                       passive=False,   # 
#                       durable=False,   # boolean durable, 該隊列是否需要持久化
#                       exclusive=False, # boolean exclusive,該隊列是否為該通道獨占的(其他通道是否可以消費該隊列)
#                       auto_delete=False, # boolean autoDelete,該隊列不再使用的時候,是否讓RabbitMQ伺服器自動刪除掉;
#                       arguments=None)

"""
chann.queue_declare(queue='test_tags', # 聲明 隊列, 不可與 已存在的 隊列重名 , 否則 報錯
                    durable=True,      # 設置隊列 持久化 , 報 : ChannelClosedByBroker: 406 , 錯誤, passive:是屈服的意思,將passive設為True,問題解決。
                    # passive= True,
                    )
message = "My name is shiwei"

chann.basic_publish(exchange='',
                    routing_key='test_tags',                              # 表明 要將 消息 發送到哪個隊列
                    body = message,
                    properties = pika.BasicProperties(delivery_mode = 2)  # 設置消息持久化, 將消息的屬性設置為 2 ,表示消息持久化
                    )

print('[Publisher] Send %s' % message)
conn.close()

消費者

import pika
import time

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

chann = conn.channel()

chann.queue_declare(queue='test_tags',  # 聲明 隊列, 不可與 已存在的 隊列重名 , 否則 報錯
                    durable=True,       # 設置隊列 持久化 ,
                    # passive=True,     # 是否檢查當前隊列 是否存在 , True 表示 當前聲明隊列 為 存在 的,
                    )
# 定義 接受消息 的 回調函數
def callback(ch,method, properties, body):
    print(" [消費者] Received %r" % body)
    time.sleep(3)
    print(" [消費者] Done")
    # 手動 確認  在接收到 消息後 給 rabbitmq 發送一個 確認 ACK, 返回 消息標識符
    ch.basic_ack(delivery_tag=method.delivery_tag)
"""
    def basic_consume(self,
                      queue,
                      on_message_callback,
                      auto_ack=False,
                      exclusive=False,
                      consumer_tag=None,
                      arguments=None):
"""
chann.basic.qos (prefetch_count = 1 )
# 註意 源碼中的 位置參數的位置
chann.basic_consume(queue='test_tags',
                    on_message_callback = callback,
                    # 是否 需要 自動 確認, 若為 False, 則需要在 消息回調函數中手動確認,
                    auto_ack = False,  # 預設是  False
                    )

chann.start_consuming()  # 開始 迴圈 接受消息

發佈-訂閱_廣播:一對多-------交換機

 
exchange:交換機。生產者不是將消息發送給隊列,而是將消息發送給交換機,由交換機決定將消息發送給哪個隊列。所以 
     exchange必須準確知道消息是要送到哪個隊列,還是要被丟棄。因此要在exchange中給exchange定義規則,所有的規 
      則都是在exchange的類型中定義的。
exchange有4個類型:
     fanout : 所有bind到此exchange的queue都可以接收消息
     direct :通過routingKey和exchange決定的那個唯一的queue可以接收消息
     topic  :所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
          表達式符號說明:#代表一個或多個字元,*代表任何字元
              例:#.a會匹配a.a,aa.a,aaa.a等
                 *.a會匹配a.a,b.a,c.a等
                 註:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 
    headers :通過headers 來決定把消息發給哪些queue
   之前,我們並沒有講過exchange,但是我們仍然可以將消息發送到隊列中。這是因為我們用的是預設exchange.也就是 
        說之前寫的:exchange='',空字元串表示預設的exchange。

exchange_type=fanout

廣播類型,生產者將消息發送給所有消費者,如果某個消費者沒有收到當前消息,就再也收不到了(消費者就像收音機)
           生產者:(可以用作日誌收集系統)
           開啟多個消費者後,會同時從生產者接收相同的消息

fanout


消息publisher

import pika

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

#在連接上創建一個頻道
channel = conn.channel()

#創建一個fanout(廣播)類型的交換機exchange,名字為logs。
channel.exchange_declare(exchange="logs", exchange_type="fanout")
message =  "info: Hello World!"
channel.basic_publish(exchange='logs',# 指定交換機exchange為logs,這裡只需要指定將消息發給交換機logs就可以了,不需要指定隊列,因為生產者消息是發送給交換機的。
                      routing_key='', # 在fanout類型中,綁定關鍵字routing_key必須忽略,寫空即可
                      body=message)
print(" [x] Sent %r" % message)
conn.close()

消息subscriber

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

#在連接上創建一個頻道
channel = conn.channel()

channel.exchange_declare(exchange="logs", exchange_type="fanout")  # 參數名改變了,以前版本是 type

result = channel.queue_declare(exclusive=True,  # 創建隨機隊列,exclusive=True(唯一性)當消費者與rabbitmq斷開連接時,這個隊列將自動刪除。
                               queue='',)

queue_name = result.method.queue # 分配隨機隊列的名字。
channel.queue_bind(exchange='logs',# 將交換機、隊列綁定在一起,
                   queue=queue_name,)

def callback(ch, method, properties, body):   #  定義回調函數,接收消息
    print(" [消費者] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name,
                      on_message_callback = callback,
                      auto_ack=True) # 消費者接收消息後,不給rabbimq回執確認。

channel.start_consuming() # 迴圈等待消息接收。

exchange_type=direct

   RabbitMQ還支持根據關鍵字發送,無需聲明隊列,即:發佈時給隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 
        關鍵字 判定應該將數據發送至指定隊列。
        例如: 根據 日誌級別,info, warning, error, success,本例即是。 
   註意: *****本例 需從命令行啟動,給定參數-------》隊列的綁定關鍵字
 

direct

消息publisher

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

#在連接上創建一個頻道
channel = conn.channel()

#創建一個交換機並聲明 direct 的類型為:關鍵字類型,表示該交換機會根據消息中不同的關鍵字將消息發送給不同的隊列
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")

severity = sys.argv[1] if len(sys.argv) > 1 else "info"

message = ' '.join(sys.argv[2:]) or "Hello World!"

channel.basic_publish(exchange='direct_logs', # 指明用於發佈消息的交換機、關鍵字
                      routing_key=severity,   # 綁定關鍵字,即將message與關鍵字info綁定,明確將消息發送到哪個關鍵字的隊列中。
                      body=message)
print(" [x] Sent %r" % message)
conn.close()

消息subscriber

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

# 在連接上創建一個頻道
channel = conn.channel()

channel.exchange_declare(exchange="direct_logs", exchange_type="direct")  # 參數 名改變了, 以前是 type

result = channel.queue_declare(exclusive=True,  # 創建隨機隊列,當消費者與rabbitmq斷開連接時,這個隊列將自動刪除。
                               queue='',)
queue_name = result.method.queue              # 分配隨機隊列的名字。

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:                   # 迴圈 隊列, 使其與交換機綁定在一起,
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity,)

def callback(ch, method, properties, body):   # 定義回調函數,接收消息
    print(" [消費者] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name,
                      on_message_callback = callback,
                      auto_ack=True,)         # 消費者接收消息後,不給rabbimq回執確認。

channel.start_consuming()                     # 迴圈等待消息接收。

  

exchange_type=topic---> 模糊匹配類型。比較常用

 
  發送到一個 topics交換機的消息,它的 routing_key不能是任意的 -- 它的routing_key必須是一個用小數點分割的 
      單詞列表。 這個字元可以是任何單詞,但是通常是一些指定意義的字元。比如: 
      “stock.usd.nyse","nyse.vmw","quick.orange.rabbit".  這裡可以是你想要路由鍵的任意字元。最高限制 
       為255位元組。
  生產者與消費者的routing_key必須在同一個表單中。 Topic交換的背後的邏輯類似直接交換(direct) -- 包含特定關 
      鍵字的消息將會分發到所有匹配的關鍵字隊列中。然後有兩個重要的特殊情況:
      綁定鍵值:
        > * (星)  可代替一個單詞
        > # (井) 可代替0個或多個單詞 
       註意: *****本例 需從命令行啟動,給定參數-------》隊列的綁定關鍵字

topic

消息publisher

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

channel = conn.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
conn.close()

消息subscriber

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

channel = conn.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True,
                               queue="",)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(on_message_callback = callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()
To receive all the logs run:
   python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
   python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
   python receive_logs_topic.py "*.critical"
You can create multiple bindings:
   python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
   python emit_log_topic.py "kern.critical" "A critical kernel error"

exchange_type=topic----例 二 :

生產者

 1 import pika
 2 import sys
 3 
 4 username = "shiwei"
 5 pwd = 'shiwei666666'
 6 user_pwd = pika.PlainCredentials(username, pwd)
 7 
 8 # 創建連接
 9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
10 
11 channel = conn.channel()
12 channel.exchange_declare(exchange='topic_logs',
13                          exchange_type='topic')  # 創建模糊匹配類型的exchange。。
14 
15 routing_key = '[warn].kern' # 這裡關鍵字必須為點號隔開的單詞,以便於消費者進行匹配。引申:這裡可以做一個判斷,判斷產生的日誌是什麼級別,然後產生對應的routing_key,使程式可以發送多種級別的日誌
16 message =  'Hello World!'
17 channel.basic_publish(exchange='topic_logs',#將交換機、關鍵字、消息進行綁定
18                       routing_key=routing_key,  # 綁定關鍵字,將隊列變成[warn]日誌的專屬隊列
19                       body=message)
20 print(" [x] Sent %r:%r" % (routing_key, message))
21 conn.close()

 

消費者

 1 import pika
 2 import sys
 3 
 4 username = "shiwei"
 5 pwd = 'shiwei666666'
 6 user_pwd = pika.PlainCredentials(username, pwd)
 7 
 8 # 創建連接
 9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
10 
11 channel = conn.channel()
12 
13 channel.exchange_declare(exchange='topic_logs',
14                          exchange_type='topic')  # 聲明exchange的類型為模糊匹配。
15 
16 result = channel.queue_declare(exclusive=True,
17                                queue="",)        # 創建隨機一個隊列當消費者退出的時候,該隊列被刪除。
18 queue_name = result.method.queue                 # 創建一個隨機隊列名字。
19 
20 # 綁定鍵。‘#’匹配所有字元,‘*’匹配一個單詞。這裡列表中可以為一個或多個條件,能通過列表中字元匹配到的消息,消費者都可以取到
21 binding_keys = ['[warn].*', 'info.*']
22 if not binding_keys:
23     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
24     sys.exit(1)
25 
26 # 通過迴圈綁定多個“交換機-隊列-關鍵字”,只要消費者在rabbitmq中能匹配到與關鍵字相應的隊列,就從那個隊列里取消息
27 for binding_key in binding_keys:
28     channel.queue_bind(exchange='topic_logs',
29                        queue=queue_name,
30                        routing_key=binding_key)
31 
32 print(' [*] Waiting for logs. To exit press CTRL+C')
33 
34 
35 def callback(ch, method, properties, body):
36     print(" [x] %r:%r" % (method.routing_key, body))
37 
38 
39 channel.basic_consume(on_message_callback=callback,
40                       queue=queue_name,
41                       auto_ack=True)       # 不給rabbitmq發送確認
42 
43 channel.start_consuming()                  # 迴圈接收消息

遠程過程調用(RPC)Remote procedure call

消息屬性
AMQP協議在一個消息中預先定義了一個包含14個屬性的集合。大部分屬性很少用到,以下幾種除外:
  > delivery_mode: 標記一個消息為持久的(值為2)或者 瞬時的(其它值), 你需要記住這個屬性(在第二課時用到過)
  > content_type : 用來描述 MIME 類型的編碼 ,比如我們經常使用的 JSON 編碼,設置這個屬性就非常好實現: application/json
  > reply_to:reply_to沒有特別的意義,只是一個普通的變數名,只是它通常用來命名一個 callback 隊列
  > correlation_id : 用來關聯RPC的請求與應答。關聯id的作用:當在一個隊列中接收了一個返回,我們並不清楚這個結果時屬於哪個請求的,這樣當correlation_id屬性使用後,我們為每個請求設置一個唯一值,這個值就是關聯id。這樣,請求會有一個關聯id,該請求的返回結果也有一個相同的關聯id。然後當我們從callback隊列中接收到一個消息後,我們查看一下這個關聯,基於這個我們就能將請求和返回進行匹配。如果我們看到一個未知的correlation_id值,我們可以直接丟棄這個消息 -- 它是不屬於我們的請求。
RPC執行過程:
    >  當客戶端啟動後,它創建一個匿名的唯一的回調隊列
    > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識)
    > 請求發送到 rpc_queue隊列
    > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to欄位中的隊列
    > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用

rpc

RPC Running Detail:
    >  當客戶端啟動後,它創建一個匿名的唯一的回調隊列
    > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識)
    > 請求發送到 rpc_queue隊列
    > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to欄位中的隊列
    > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用

RPC Client

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        username = "shiwei"
        pwd = 'shiwei666666'
        user_pwd = pika.PlainCredentials(username, pwd)

        # 創建連接
        self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

        self.channel = self.conn.channel()

        result = self.channel.queue_declare(exclusive=True, queue= '')  # 隨機生成 一個 queue , 用與 Server 發送消息
        self.callback_queue = result.method.queue

        self.channel.basic_consume(on_message_callback = self.on_response, auto_ack = True,  # 準備 發送 消息
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                                reply_to=self.callback_queue,
                                                correlation_id=self.corr_id,),
                                   body=str(n))
        while self.response is None:
            self.conn.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(7)
print(" [.] Got %r" % response)
Client Running Detail:
   > (11) 我們建立一個連接,通道並定義一個專門的’callback‘隊列用來接收回覆
   > (19) 我們訂閱了“callback”隊列,因此我們能夠接收 RPC 的返回結果
   > (21) ’on_response'  在每個返回中執行的回調是一個簡單的job, 對每個返回消息將檢查correlation_id是否是我們需要查找的那個ID,如果是,將保存結果到 self.response 並終端consuming迴圈
   > (25) 下一步,我們定義我們的main方法 - 執行實際的RPC請求
   > (27) 在這方法中,首先我們生產一個唯一的 correlatin_id 號並保存 -- 'on_response"回調函數將用著號碼來匹配發送和接收的消息值
   > (28) 下一步,發佈請求信息,使用兩個屬性: reply_to 和 correlation_id
   > (34) 這一步我們可以坐等結果的返回
   > (36) 最後我們返回結果給用戶

RPC Server

import pika

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 創建連接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

channel = conn.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback = on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
Server Running Detail:
    >  當客戶端啟動後,它創建一個匿名的唯一的回調隊列
    >  對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來為每個請求設置一個唯一標識)
    >  請求發送到 rpc_queue隊列
    >  RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to欄位中的隊列
    >  客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,如果和請求中的值匹配將返回給應用

RPC Demo02

處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。其實在這裡接收端、發送端 
    的概念已經比較模糊了,因為發送端也同樣要接收消息,接收端同樣也要發送消息,所以這裡筆者使用另外的示例來演示這一過程。
示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個自然數N發送給計算節點,計算節點將N值加1後,返回給控 
    制中心。這裡用center.py模擬控制中心,compute.py模擬計算節點。

Client

 1 import pika
 2 
 3 class Center(object):
 4     def __init__(self):
 5         username = "shiwei"
 6         pwd = 'shiwei666666'
 7         user_pwd = pika.PlainCredentials(username, pwd)
 8 
 9         # 創建連接
10         self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
11 
12         self.channel = self.conn.channel()
13         # 定義接收返回消息的隊列
14         result = self.channel.queue_declare(exclusive=True,queue="",)
15         self.callback_queue = result.method.queue
16 
17         self.channel.basic_consume(on_message_callback=self.on_response,
18                                    auto_ack=True,
19                                    queue=self.callback_queue)
20 
21     # 定義接收到返回消息的處理方法
22     def on_response(self, ch, method, props, body):
23         self.response = body
24 
25     def request(self, n):
26         self.response = None
27         # 發送計算請求,並聲明返回隊列
28         self.channel.basic_publish(exchange='',
29                                    routing_key='compute_queue',
30                                    properties=pika.BasicProperties(
31                                        reply_to=self.callback_queue,
32                                    ),
33                                    body=str(n))
34         # 接收返回的數據
35         while self.response is None:
36             self.conn.process_data_events()
37         return int(self.response)
38 
39 center = Center()
40 
41 print(" [x] Requesting increase(30)")
42 response = center.request(30)
43 print(" [.] Got %r" % (response,))
Client

Server

 1 import pika
 2 
 3 username = "shiwei"
 4 pwd = 'shiwei666666'
 5 user_pwd = pika.PlainCredentials(username, pwd)
 6 # 創建連接
 7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
 8 channel = conn.channel()
 9 
10 print(' [*] Waiting for n')
11 channel.queue_declare(queue='compute_queue')
12 
13 # 將n值加1
14 def increase(n):
15     return n + 1
16 
17 # 定義接收到消息的處理方法
18 def request(ch, method, properties, body):
19     print(" [.] increase(%s)" % (body,))
20 
21     response = increase(int(body))
22 
23     # 將計算結果發送回控制中心
24     ch.basic_publish(exchange='',
25                      routing_key=properties.reply_to,
26                      body=str(response))
27     ch.basic_ack(delivery_tag=method.delivery_tag)
28 
29 channel.basic_qos(prefetch_count=1)
30 channel.basic_consume(on_message_callback=request,
31                       queue='compute_queue')
32 
33 channel.start_consuming()
Server

參考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 記憶秘訣:BBDEH OPRLM TLSUV 寶貝的恩惠 歐派入聯盟 偷了suv,19 目錄 英文釋義 簡寫 詳解 1 / 根目錄 整個文件系統的唯一根目錄 2 /bin Binary 普通命令目錄 存放常用的系統命令 3 /boot Boot 開機引導目錄 包括Linux內核文件與開機所需要的文件 ...
  • [toc] linux許可權管理 特殊許可權 一,特殊許可權 1.suid(4000) SetUID(suid):會在屬主許可權位的執行許可權上寫個s 如果該屬主許可權位上有執行許可權,則:s 如果該屬主許可權位上沒有執行許可權,則:S 授權方式 setuid總結: ​ 1.讓普通用戶對可執行的二進位文件,臨時擁有二 ...
  • [toc] Linux許可權管理 特殊許可權 一、Linux系統特殊許可權概述 除了r(讀)、 w(寫)、 x(執行)這三種普通許可權外,在查詢系統文件許可權時會發現還有其他的許可權字母。 二、特殊許可權suid介紹 1. 許可權(4000) 在Linux系統中,每個普通用戶都可以更改自己的密碼,這是合理的設置,問 ...
  • 一、rpm 簡介 這是一個資料庫管理工具,可以通過讀取資料庫,判斷軟體是否已經安裝,如果已經安裝可以讀取出來所有文件的所在位置等,並可以實現刪除這些文件。 rpm:RPM is Redhat Package Manager(遞歸縮寫) rpm可以完成的操作 安裝軟體 卸載軟體 查詢軟體信息 升級、降 ...
  • 參考文章:https://www.jianshu.com/p/97c35d569aa3 因為Ubuntu自帶的源伺服器在國外,下載和更新軟體的時候速度很慢,不穩定,所以需要將源更新為國內的源。國內的源比較多,常用的就是阿裡的源。 1.備份原有源文件,方便在替換出問題時回滾 2.更改sources.l ...
  • 作用 實現對不同伺服器時間的同步校準 NTP時間服務 第一步 安裝 第二步 設置 進入配置文件 然後刪除裡面的所有內容。並插入以下代碼 第三部 重啟NTP服務 第四部 檢查NTP服務狀態 第五部 客戶端下載NTP客戶端服務 第六步 啟動測試 [root@localhost ~]# date -s " ...
  • 一 Kubernetes證書 1.1 TLS Kubernetes系統的各個組件需要使用TLS證書對其通信加密以及授權認證,建議在部署之前先生成相關的TLS證書。 1.2 CA證書創建方式 kubernetes 系統各個組件需要使用TLS證書對通信進行加密,通常可通過以下工具生產自建證書: open ...
  • 顯示屏幕(LCD)模塊提供屏幕相關功能介面,調用者為上層應用模塊(含 init、狀態機、ui),上下文依賴關係,如圖 3 7 所示。 系統框架為 linux+Huawei LiteOS 雙系統架構,媒體業務部署在 Huawei LiteOS 端,為了上電快速預覽,需要屏幕需部署在 Huawei Li ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...