python之RabbitMQ

来源:http://www.cnblogs.com/ernest-zhang/archive/2016/07/30/5720029.html
-Advertisement-
Play Games

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通信,而無需專用連接來鏈 ...


RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程式之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程式通過 隊列來通信。隊列的使用除去了接收和發送應用程式同時執行的要求。   安裝RabbitMQ:
安裝配置epel源:(詳見http://www.cnblogs.com/ernest-zhang/p/5714434.html)
安裝erlang:
yum -y install erlang
註:安裝erlang的時候碰到
    Error: Package: erlang-erts-R14B-04.3.el6.i686 (epel)
           Requires: libz.so.1(ZLIB_1.2.2)
[root@localhost ~]# yum whatprovides libz.so.1
Loaded plugins: rhnplugin
This system is not registered with RHN.
RHN support will be disabled.
zlib-1.2.3-25.el6.i686 : The zlib compression and decompression library #提供壓縮與解壓縮庫
Repo        : local
Matched from:
Other       : libz.so.1
檢查發現應該是zlib的版本太老了,從網上下載最新的zlib-1.2.8-10.fc24.i686,然後使用RPM安裝後解決。
下載地址:http://www.zlib.net/  #zlib官網
http://rpmfind.net/linux/rpm2html/search.php?query=zlib    #zlib下載網站
安裝rabbitMQ:
yum -y install rabbitmq-server
service rabbitmq-server start/stop 啟動和停止rabbitmq 安裝API,然後可以基於API操作rabbitmq
pip install pika
or
easy_install pika
or
源碼
 
https://pypi.python.org/pypi/pika
Python 操作RabbitMQ 發佈端:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))    #伺服器地址
channel=connection.channel()
channel.queue_declare(queue='Hi')   #如果有隊列,略過;如果沒有,創建隊列
channel.basic_publish(exchange='',routing_key='cc',body='hello!world!!!')
print("[x] sent 'hello,world!'")
connection.close()

接收端:

import pika
#創建一個連接對象,綁定rabbitmq的IP
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
#創建一個頻道對象
channel=connection.channel()
#頻道中聲明指定queue,如果MQ中沒有指定queue就創建,如果有,則略過
channel.queue_declare(queue='Hi')
#定義回調函數
def callback(ch,method,properties,body):
    print('[x] Recieved %r'%body)
    # channel.close()
#no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq,callback:回調函數,queue:指定隊列
channel.basic_consume(callback,queue='Hi',no_ack=True)
# channel.basic_consume(callback,queue='cc')
print('[*] Waiting for msg')
channel.start_consuming()

1、acknowledgment 消息不丟失

no-ack = False,如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會重新將該任務添加到隊列中。

  • 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
    print('[x] Recieved %r' % body)
    # channel.close()
    ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
                      no_ack=False)  
print('[*] Waiting for msg')
channel.start_consuming()
View Code

durable 消息不丟失

消息生產者端發送消息時掛掉了,消費者接消息時掛掉了,以下方法會讓RabbitMQ重新將該消息添加到隊列中:

  • 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag),消費端需要做的
  • basic_comsume中的no_ack=False,消費端需要做的
  • 發佈消息端的basic_publish添加參數properties=pika.BasicProperties(delivery_mode=2),生產者端需要做的
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')  # 如果有,略過;如果沒有,創建隊列
channel.basic_publish(exchange='',
                      routing_key='Hi',
                      body='hello!world!!!',
                      properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
print("[x] sent 'hello,world!'")
connection.close()
生產者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
    print('[x] Recieved %r' % body)
    # channel.close()
    ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
                      no_ack=True)
print('[*] Waiting for msg')
channel.start_consuming()
消費者

消息獲取順序

預設消息隊列里的數據是按照順序被消費者拿走,例如:消費者1去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。但有大部分情況下,消息隊列後端的消費者伺服器的處理能力是不相同的,這就會出現有的伺服器閑置時間較長,資源浪費的情況,那麼,我們就需要改變預設的消息隊列獲取順序!

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列,這是消費者端需要做的

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
    print('[x] Recieved %r' % body)
    # channel.close()
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)     #改變預設獲取順序,誰來誰取
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
                      no_ack=True)
print('[*] Waiting for msg')
channel.start_consuming()
消費者

發佈和訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發佈和訂閱時,會為每一個訂閱者創建一個隊列,而發佈者發佈消息時,會將消息放置在所有相關隊列中。

RabbitMQ中,所有生產者提交的消息都由Exchange來接受,然後Exchange按照特定的策略轉發到Queue進行存儲 。RabbitMQ提供了四種Exchange:fanout,direct,topic,header header模式在實際使用中較少,只對前三種模式進行比較。

exchange type = fanout

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。

1.可以理解為路由表的模式

2.這種模式不需要RouteKey

3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。

4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。

import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',type='fanout')
msg='456'
channel.basic_publish(exchange='logs_fanout',routing_key='',body=msg)
print('開始發送:%s'%msg)
connection.close()
生產者
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',type='fanout')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#綁定相關隊列名稱
channel.queue_bind(exchange='logs_fanout',queue=queue_name)
def callback(ch,method,properties,body):
    print('[x] %r'%body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者

 關鍵字

任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。 1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字元串,下文稱其為default Exchange)。  2.這種模式下不需要將Exchange進行任何綁定(binding)操作  3.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。  4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
serverity='error'
msg='123'
channel.basic_publish(exchange='logs_direct_test1',routing_key=serverity,body=msg)
print('開始發送:%r:%r'%(serverity,msg))
connection.close()
生產者
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=['error','info','warning',]
for serverity in serverities:
    channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity)
print('[***] 開始接受消息!')
def callback(ch,method,properties,body):
    print('[x] %r:%r'%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者1
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=['error',]
for serverity in serverities:
    channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity)
print('[***] 開始接受消息!')
def callback(ch,method,properties,body):
    print('[x] %r:%r'%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者2

模糊訂閱

任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上 1.這種模式較為複雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RouteKey),Exchange會將消息轉發到所有關註主題能與RouteKey模糊匹配的隊列。  2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。  3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。  4.“#”表示0個或若幹個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。  5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息。
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()
消費者 好文推薦: http://hwcrazy.com/b5fce358672411e3baa0000d601c5586/group/free_open_source_project/

 

 

  

  

 

 

 

 

  


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • @(Java)[Struts|Interceptor] Struts2 更改校驗配置文件位置 在Struts2中提供的攔截器校驗 ,該校驗器中預設的配置文件位於Action包的位置下,需要和Action類放在一起,而提供的註解又不能針對每個方法不同的參數校驗,只能使用配置文件方式來實現同一個Acti ...
  • 父類: 子類: 輸出: the entrance. test.Child@2a139a55class test.Child call Child1 functiontest.Child@2a139a55class test.Childcall Child1 functionException in ...
  • 我們訪問資源需要關註對資源的鎖定、對資源的申請和釋放,還有考慮可能遇到的各種異常。這些事項本身與代碼的邏輯操作無關,但我們不能遺漏。也就是說進入方法時獲取資源,退出方法時釋放資源。這種處理就進入了Execute Around模式的範疇。 在scala里可以用函數值實現這種模式。下麵是一個示例,使用R... ...
  • 題意不難理解,但是一開始還是沒有看清楚題目。Replace the first occurrence of the find string within the text by the replace-by string, then try to perform the same replaceme ...
  • 一、事務 簡單點說,事務就是一件事情。所有與事務相關的內容都是圍繞這一件事情展開的。 二、事務的特性:ACID A:Atomicity(原子性),事務必須是一個不可分割的整體。 C:Consistency(一致性),執行完資料庫操作後,數據不會被破壞。如:從 A 賬戶轉賬到 B,要保證 A 賬戶扣錢 ...
  • 在前邊的文章中對log4j的配置文件進行了說明,今天介紹如何在普通的javaWeb項目中使用log4j。 在日常的開發過程中,日誌使用的很頻繁,我們可以利用日誌來跟蹤程式的錯誤,程式運行時的輸出參數等,很多情況下可能會使用System.out.println()這個方法,但是還有一種更加簡潔的方式, ...
  • C++對象模型是比較重要的一個知識點,學習C++對象的記憶體模型,就可以明白C++中的多態原理、類的初始化順序問題、類的大小問題等。 1 C++對象模型基礎 1.1 C++對象中都有哪些東東 C++對象中包括以下內容: 靜態常量 成員變數 成員函數 虛函數 純續函數 ... 以下是一個對象的定義: 1 ...
  • getch, 這個立即接收輸入的字元. 一個微不足道的函數, 就改變了游戲交互對於linux的認識. 本文構建一個跨平臺的getch, 滿足及時輸入交互. ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...