RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通信,而無需專用連接來鏈 ...
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程式之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程式通過 隊列來通信。隊列的使用除去了接收和發送應用程式同時執行的要求。 安裝RabbitMQ:
安裝配置epel源:(詳見http://www.cnblogs.com/ernest-zhang/p/5714434.html) 安裝erlang: yum -y install erlang 註:安裝erlang的時候碰到 Error: Package: erlang-erts-R14B-04.3.el6.i686 (epel) Requires: libz.so.1(ZLIB_1.2.2) [root@localhost ~]# yum whatprovides libz.so.1 Loaded plugins: rhnplugin This system is not registered with RHN. RHN support will be disabled. zlib-1.2.3-25.el6.i686 : The zlib compression and decompression library #提供壓縮與解壓縮庫 Repo : local Matched from: Other : libz.so.1 檢查發現應該是zlib的版本太老了,從網上下載最新的zlib-1.2.8-10.fc24.i686,然後使用RPM安裝後解決。 下載地址:http://www.zlib.net/ #zlib官網 http://rpmfind.net/linux/rpm2html/search.php?query=zlib #zlib下載網站 安裝rabbitMQ: yum -y install rabbitmq-serverservice rabbitmq-server start/stop 啟動和停止rabbitmq 安裝API,然後可以基於API操作rabbitmq
pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pikaPython 操作RabbitMQ 發佈端:
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) #伺服器地址 channel=connection.channel() channel.queue_declare(queue='Hi') #如果有隊列,略過;如果沒有,創建隊列 channel.basic_publish(exchange='',routing_key='cc',body='hello!world!!!') print("[x] sent 'hello,world!'") connection.close()
接收端:
import pika #創建一個連接對象,綁定rabbitmq的IP connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) #創建一個頻道對象 channel=connection.channel() #頻道中聲明指定queue,如果MQ中沒有指定queue就創建,如果有,則略過 channel.queue_declare(queue='Hi') #定義回調函數 def callback(ch,method,properties,body): print('[x] Recieved %r'%body) # channel.close() #no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq,callback:回調函數,queue:指定隊列 channel.basic_consume(callback,queue='Hi',no_ack=True) # channel.basic_consume(callback,queue='cc') print('[*] Waiting for msg') channel.start_consuming()
1、acknowledgment 消息不丟失
no-ack = False,如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會重新將該任務添加到隊列中。
- 回調函數中的
ch.basic_ack(delivery_tag=method.delivery_tag)
- basic_comsume中的
no_ack=False
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 定義回調函數 def callback(ch, method, properties, body): print('[x] Recieved %r' % body) # channel.close() ch.basic_ack(delivery_tag=method.delivery_tag) # no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq channel.basic_consume(callback, queue='Hi', no_ack=False) print('[*] Waiting for msg') channel.start_consuming()View Code
durable 消息不丟失
消息生產者端發送消息時掛掉了,消費者接消息時掛掉了,以下方法會讓RabbitMQ重新將該消息添加到隊列中:
- 回調函數中的
ch.basic_ack(delivery_tag=method.delivery_tag)
,消費端需要做的 - basic_comsume中的
no_ack=False
,消費端需要做的 - 發佈消息端的basic_publish添加參數
properties=pika.BasicProperties(delivery_mode=2)
,生產者端需要做的
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 如果有,略過;如果沒有,創建隊列 channel.basic_publish(exchange='', routing_key='Hi', body='hello!world!!!', properties=pika.BasicProperties(delivery_mode=2)) #消息持久化 print("[x] sent 'hello,world!'") connection.close()生產者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 定義回調函數 def callback(ch, method, properties, body): print('[x] Recieved %r' % body) # channel.close() ch.basic_ack(delivery_tag=method.delivery_tag) # no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq channel.basic_consume(callback, queue='Hi', no_ack=True) print('[*] Waiting for msg') channel.start_consuming()消費者
消息獲取順序
預設消息隊列里的數據是按照順序被消費者拿走,例如:消費者1去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。但有大部分情況下,消息隊列後端的消費者伺服器的處理能力是不相同的,這就會出現有的伺服器閑置時間較長,資源浪費的情況,那麼,我們就需要改變預設的消息隊列獲取順序!
channel.basic_qos(prefetch_count=1)
表示誰來誰取,不再按照奇偶數排列,這是消費者端需要做的
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 定義回調函數 def callback(ch, method, properties, body): print('[x] Recieved %r' % body) # channel.close() ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) #改變預設獲取順序,誰來誰取 # no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq channel.basic_consume(callback, queue='Hi', no_ack=True) print('[*] Waiting for msg') channel.start_consuming()消費者
發佈和訂閱
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發佈和訂閱時,會為每一個訂閱者創建一個隊列,而發佈者發佈消息時,會將消息放置在所有相關隊列中。
RabbitMQ中,所有生產者提交的消息都由Exchange來接受,然後Exchange按照特定的策略轉發到Queue進行存儲 。RabbitMQ提供了四種Exchange:fanout,direct,topic,header header模式在實際使用中較少,只對前三種模式進行比較。exchange type = fanout
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') msg='456' channel.basic_publish(exchange='logs_fanout',routing_key='',body=msg) print('開始發送:%s'%msg) connection.close()生產者
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') #隨機創建隊列 result=channel.queue_declare(exclusive=True) queue_name=result.method.queue #綁定相關隊列名稱 channel.queue_bind(exchange='logs_fanout',queue=queue_name) def callback(ch,method,properties,body): print('[x] %r'%body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()消費者
關鍵字
任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。 1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字元串,下文稱其為default Exchange)。 2.這種模式下不需要將Exchange進行任何綁定(binding)操作 3.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。 4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_direct_test1',type='direct') serverity='error' msg='123' channel.basic_publish(exchange='logs_direct_test1',routing_key=serverity,body=msg) print('開始發送:%r:%r'%(serverity,msg)) connection.close()生產者
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_direct_test1',type='direct') #隨機創建隊列 result=channel.queue_declare(exclusive=True) queue_name=result.method.queue serverities=['error','info','warning',] for serverity in serverities: channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity) print('[***] 開始接受消息!') def callback(ch,method,properties,body): print('[x] %r:%r'%(method.routing_key,body)) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()消費者1
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_direct_test1',type='direct') #隨機創建隊列 result=channel.queue_declare(exclusive=True) queue_name=result.method.queue serverities=['error',] for serverity in serverities: channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity) print('[***] 開始接受消息!') def callback(ch,method,properties,body): print('[x] %r:%r'%(method.routing_key,body)) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()消費者2
模糊訂閱
任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上 1.這種模式較為複雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RouteKey),Exchange會將消息轉發到所有關註主題能與RouteKey模糊匹配的隊列。 2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。 3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。 4.“#”表示0個或若幹個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。 5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息。#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()生產者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()消費者 好文推薦: http://hwcrazy.com/b5fce358672411e3baa0000d601c5586/group/free_open_source_project/