1、安裝erlang 2、安裝rabbitMQ 3、添加用戶 4、將角色添加到管理員組 5、設置用戶許可權 6、啟用web插件 7、啟動rabbitMQ服務 8、訪問http://192.168.10.10:15672/,如果一切順利的話你會看到如下界面 9、輸入用戶名密碼進入rabbitMQ後臺,你 ...
1、安裝erlang
[root@localhost ~]#yum -y install erlang
2、安裝rabbitMQ
[root@localhost ~]#yum -y install rabbitmq-server
3、添加用戶
[root@localhost ~]# rabbitmqctl add_user rabbit_user 123.com // 添加admin的用戶密碼為123.com
4、將角色添加到管理員組
[root@localhost ~]# rabbitmqctl set_user_tags rabbit_user administrator
5、設置用戶許可權
[root@localhost ~]# rabbitmqctl set_permissions -p "/" rabbit_user ".*" ".*" ".*" //set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
6、啟用web插件
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
7、啟動rabbitMQ服務
[root@localhost ~]# systemctl start rabbitmq-server
8、訪問http://192.168.10.10:15672/,如果一切順利的話你會看到如下界面
9、輸入用戶名密碼進入rabbitMQ後臺,你會看到像下麵這個樣子。
到此rabbitMQ已經可以正常運行了,下麵我們使用Python來操作隊列。
1、安裝pika模塊
[root@localhost ~]# pip3 install pika
2、創建生產者模型
[root@localhost rabbitMQ]# vim producer.py #!/usr/bin/env python import pika # 創建憑證,使用rabbitmq用戶密碼登錄 credentials = pika.PlainCredentials("rabbit_user","123.com") # 新建連接,這裡localhost可以更換為伺服器ip connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials)) # 創建通道 channel = connection.channel() # 在通道內聲明一個隊列,用於接收消息,隊列名字叫“test_queue” channel.queue_declare(queue='test_queue') # 註意在rabbitmq中,消息想要發送給隊列,必須經過交換(exchange),我們暫且將(exchange=''), # 它允許我們精確的指定發送給哪個隊列(routing_key=''),參數body值發送的數據 while True: content = input("生產者輸入數據>>>") if content.upper() == "Q": break channel.basic_publish(exchange='', routing_key='test_queue', body=content) print("已經發送了消息") # 程式退出前,確保刷新網路緩衝以及消息發送給rabbitmq,需要關閉本次連接 connection.close()
3、運行生產者模型
[root@localhost rabbitMQ]# python3 producer.py
4、創建消費者模型
[root@localhost rabbitMQ]# vim consumer.py #!/usr/bin/env python import pika # 建立與rabbitmq的連接 credentials = pika.PlainCredentials("rabbit_user","123.com") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials)) # 創建通道 channel = connection.channel() # 在通道中創建隊列 channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body): print("消費者接收了消息:%r"%body.decode("utf8")) # 有消息來時,立即執行callbak。 channel.basic_consume("test_queue",callbak,auto_ack=True) # 等待接收消息 channel.start_consuming()
auto_ack=True:表示不確認機制也就是說每次消費者接收到數據後,不管是否處理完畢,rabbitmq-server都會把這個消息標記完成,從隊列中刪除,這樣會有個缺陷,就是當我們從隊列中獲取到消息後,碰巧程式崩潰,或者什麼其它原因導致程式終,此時消息已經從消息隊列中刪除,造成數據的不安全。
5、運行消費者模型
[root@localhost rabbitMQ]# python3 consumer.py
6、驗證消息隊列的輪詢機制
7、下麵演示auto_ack=True的不安全機制,我們在消費者模型中主動拋出異常模擬程式非正常終止,然後查看消息隊列。
7.1修改消費者模型(主動拋出異常)如下:
[root@localhost rabbitMQ]# vim consumer.py #!/usr/bin/env python import pika # 建立與rabbitmq的連接 credentials = pika.PlainCredentials("rabbit_user","123.com") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials)) # 創建通道 channel = connection.channel() # 在通道中創建隊列 channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body): # 主動拋出異常 raise TypeError print("消費者接收了消息:%r"%body.decode("utf8")) # 有消息來時,立即執行callbak。 channel.basic_consume("test_queue",callbak,auto_ack=True) # 等待接收消息 channel.start_consuming()
7.2運行生產者模型
[root@localhost rabbitMQ]# python3 producer.py
7.3運行消費之模型,查看隊列變化
8、使用相對可靠的消息機制確認來保證數據安全
修改消費者模型的配置文件如下;
[root@localhost rabbitMQ]# vim consumer.py #!/usr/bin/env python import pika # 建立與rabbitmq的連接 credentials = pika.PlainCredentials("rabbit_user","123.com") connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials)) # 創建通道 channel = connection.channel() # 在通道中創建隊列 channel.queue_declare(queue="test_queue") def callbak(ch,method,properties,body): # 主動拋出異常 raise TypeError print("消費者接收了消息:%r"%body.decode("utf8")) # 告訴消息隊列,我已經確認收到消息了 ch.basic_ack(delivery_tag=method.delivery_tag) # 有消息來時,立即執行callbak。 channel.basic_consume("test_queue",callbak,auto_ack=False) # 等待接收消息 channel.start_consuming()
這樣在消費者沒有確認的情況下,消息隊列中的消息是不會被刪除的。如下;
至於是選擇auto_ack確認機制還是使用auto_ack不確認機制,還需要你根據實際情況來定。
9、消息隊列的持久化
我們上面的配置如果rabbitMQ重啟服務,或者系統重啟都會導致隊列里的消息被清除掉。下麵我們修改生產者的配置文件使消息隊列持久化,即使重啟服務,消息隊列里的消息也不會被清除。
[root@localhost rabbitMQ]# vim producer.py
#!/usr/bin/env python import pika # 創建憑證,使用rabbitmq用戶密碼登錄 credentials = pika.PlainCredentials("rabbit_user","123.com") # 新建連接,這裡localhost可以更換為伺服器ip connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials)) # 創建通道 channel = connection.channel() # 在通道內聲明一個隊列,用於接收消息,隊列名字叫“test_queue”,durable=True表示持久化隊列 channel.queue_declare(queue='delivery_queue',durable=True) # 註意在rabbitmq中,消息想要發送給隊列,必須經過交換(exchange),我們暫且將(exchange=''), # 它允許我們精確的指定發送給哪個隊列(routing_key=''),參數body值發送的數據 while True: content = input("生產者輸入數據>>>") if content.upper() == "Q": break channel.basic_publish(exchange='', routing_key='delivery_queue', body=content, properties=pika.BasicProperties(delivery_mode=2)) # 2代表的是持久化隊列 print("已經發送了消息") # 程式退出前,確保刷新網路緩衝以及消息發送給rabbitmq,需要關閉本次連接 connection.close()
常用命令
// 新建用戶 rabbitmqctl add_user {用戶名} {密碼} // 設置許可權 rabbitmqctl set_user_tags {用戶名} {許可權} // 查看用戶列表 rabbitmqctl list_users // 為用戶授權 添加 Virtual Hosts : rabbitmqctl add_vhost <vhost> // 刪除用戶 rabbitmqctl delete_user Username // 修改用戶的密碼 rabbitmqctl change_password Username Newpassword // 刪除 Virtual Hosts : rabbitmqctl delete_vhost <vhost> // 添加 Users : rabbitmqctl add_user <username> <password> rabbitmqctl set_user_tags <username> <tag> ... rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> // 刪除 Users : delete_user <username> // 使用戶user1具有vhost1這個virtual host中所有資源的配置、寫、讀許可權以便管理其中的資源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看許可權 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除許可權 rabbitmqctl clear_permissions [-p VHostPath] User rabbitmqctl reset // 重啟應用 rabbitmqctl stop_app // 關閉應用 rabbitmqctl start_app // 啟動應用 rabbitmqctl list_queues // 查看隊列 rabbitmqctl list_exchanges // 查看exchangelist rabbitmqctl list_queues // 查看所有queue rabbitmqctl list_users // 查看所有用戶 rabbitmqctl list_bindings // 查看所有綁定exchange和queued 消息 rabbitmqctl list_queues name messages_ready messages_unacknowledged // 查看消息確認 rabbitmqctl status // 查看rabbitMQ的狀態信息。