RabbitMQ學習筆記02:Hello World!

来源:https://www.cnblogs.com/alongdidi/archive/2022/12/30/rabbitmq_tutorial_one.html
-Advertisement-
Play Games

參考資料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一個中間人,它接受和轉發消息。我們可以把它想象成一個郵局:當你把郵件投入郵箱的時候,你可以確信它最終會被投遞到收件人的手中。RabbitMQ就是那個郵箱、郵局和郵差。區別就在 ...


參考資料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 

 

 

前言

RabbitMQ是一個中間人,它接受和轉發消息。我們可以把它想象成一個郵局:當你把郵件投入郵箱的時候,你可以確信它最終會被投遞到收件人的手中。RabbitMQ就是那個郵箱、郵局和郵差。區別就在於RabbitMQ投遞的是二進位的消息數據。

這裡有一些術語需要說明。

發送、產生消息的程式我們稱之為生產者producer,使用此圖標表示

隊列queue就是一個有名字的郵箱,雖然消息會在RabbitMQ和你的應用程式之間流動,但是它們可以持久保存在隊列中。隊列會收到主機的記憶體和磁碟容量的限制。多個生產者可以向一個隊列發送消息,同時多個消費者consumer也可以從一個隊列中消費/接收消息。

隊列的圖標如下

接收、消費消息的程式我們稱之為消費者consumer,使用此圖標表示

註意,生產者、消費者以及RabbitMQ不需要位於同一臺伺服器上。事實上,絕大多數情況中,它們都分別位於不同的伺服器上。生產者和消費者可以是同一個程式實現。

這裡我們的實驗環境,生產者和消費者是獨立的程式文件,但是它們以及RabbitMQ都位於同一臺伺服器上。

 

 

Hello World!

學習RabbitMQ,其中的生產者和消費者需要用戶自己使用編程語言來實現。我本人是一名運維工程師,有過bash, awk的經驗,2017年學習廖雪峰的Python學了一半,近期又學習了B站高淇老師的Java前三章(包含面向對象),對於Python和Java語言算是一知半解,大概看得懂又寫不出來的水平。我自己嘗試了下,即使沒有編程基礎,只要我們嚴格按照官方的教程指導,也是可以將這些代碼實現的。

這部分我們使用python手寫生產者和消費者,使用RabbitMQ自帶的guest賬戶。生產者會發送一條簡單的消息給到名為hello的隊列,消費者從隊列會收到這條消息並將其列印出來。

簡單的流程圖如圖所示,隊列就有點類似於代表了消費者去接收了這條消息。

我們的操作系統自帶了python 3.9,無論是鍵入python又或者是python3,都是指向python3,這點可以從字元鏈接裡面看出來。

[root@rabbitmq-01 ~]# ls -l $(which python)
lrwxrwxrwx. 1 root root 9 Dec 12 20:51 /usr/bin/python -> ./python3
[root@rabbitmq-01 ~]# ls -l $(which python3)
lrwxrwxrwx. 1 root root 9 Dec 12 20:42 /usr/bin/python3 -> python3.9

我們需要使用到 Pika 庫來使得我們的python代碼可以連接RabbitMQ,這個庫也是官方推薦的。

python -m pip install pika --upgrade

如果沒有安裝pip的話,可以使用

python -m ensurepip --upgrade

接下來就可以開始正式寫python代碼了。

Sending

我們的第一個程式send.py會發送一條消息到隊列hello中。首先我們需要建立連接。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

連接建立完畢,在發送消息之前,我們需要確保隊列hello必須存在,否則我們發送出去的消息就會被丟棄。因此我們創建隊列。

channel.queue_declare(queue='hello')

此時我們可以開始發送消息了,我們計劃發送消息Hello World!到隊列hello中去。

在mq中,消息無法直接發送到隊列中去,必須要經過exchange。目前我們還不需要對其進行展開理解,大家直到這麼個概念即可,後續在講解RabbitMQ tutorial - Publish/Subscribe — RabbitMQ的時候會涉及到。

我們現在只需要將其發送給預設的exchange,使用空字元串來識別它。它是一個特殊的exchange,允許我們指定要發送消息的隊列,隊列使用routing_key參數來指定。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

這裡的print只是用來告訴用戶我們做了什麼。

消息發送完畢了,在我們退出程式前,我們應該確保網路buffer已經被清空,我們的消息真的發出去了。這裡通過優雅地關閉連接來實現。

connection.close()

發送到這裡就結束了,如果消息沒有發送成功的話,可以考慮從日誌入手排查問題。

Receiving

第二個程式是消費者receive.py,它將會從隊列中接收消息並將其列印出來。

首先我們需要連接到伺服器上,這部分代碼和生產者代碼相同。

接下來,同樣我們需要確保隊列的存在性。使用queue_declare創建隊列是冪等(idempotent)的,即使執行多次也只會創建一個隊列。

channel.queue_declare(queue='hello')

之所以在這裡重覆創建隊列,是為了

  • 我們可能不知道生產者和消費者程式,哪個會先運行,因此最好在兩端都創建隊列。
  • 創建隊列的操作需要具備冪等性。

從隊列中接收一個消息會更加複雜,它工作方式是訂閱一個callback函數到隊列上。每當隊列中有消息出現的時候,callback函數都會被Pika庫調用。在本案例中,callback函數會列印消息在屏幕上。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

接下來我們告訴mq,這個特定的callback函數需要從隊列hello接收消息。這一步需要確保隊列已存在,好在我們上面的代碼已經確保過了。

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

auto_ack參數將會在RabbitMQ tutorial - Work Queues — RabbitMQ中講解。

接下來我們進入無限迴圈,在迴圈中我們等待隊列中一旦出現新的消息,我們就會將其消費掉然後輸出消息的內容。

直到用戶輸入Ctrl+C來停止程式。

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Putting it all together

send.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

把python代碼文件放到伺服器上,準備開始測試。

首先打開第一個終端,運行消費者程式receive.py,它會占用前端一直運行下去,一旦有消息就會列印,直到來自用戶的終止指令Ctrl+C

[root@rabbitmq-01 code]# python receive.py 
 [*] Waiting for messages. To exit press CTRL+C

打開第二個終端,運行生產者程式send.py,每運行一次它都會向隊列hello發送一條消息Hello World!同時輸出到終端,隨後它就會退出,不會占用前端資源。

[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'

此時我們回到消費者終端,它會按照程式中說的消費掉隊列中的消息並將其輸出。

[root@rabbitmq-01 code]# python receive.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

send.py可以反覆執行。

[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'
[root@rabbitmq-01 code]# python send.py 
 [x] Sent 'Hello World!'

消費者程式都可以捕獲。

[root@rabbitmq-01 code]# python receive.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'

我們可以通過rabbitmqctl list_queues來查看mq實例中當前存在的隊列以及隊列中的消息數量。

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
hello	0

因為消息在一瞬間就被消費掉了,所以我們看到的消息數量都會是0。

想要退出的話就是在消費中終端執行Ctrl+C

 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
^CInterrupted

 

本片文章的內容就到此為止了,我們學會了RabbitMQ的基本概念(生產者、消費者和隊列)。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Git 分散式版本控制工具 Git最常用命令 | 命令名稱 | 作用 | | | | |git init|初始化本地庫| |git add ./文件名|把代碼添加到暫存區| |git commit -m '日誌'|把暫存區的文件添加到本地庫| |git push 鏈接 分支名|把本地庫的代碼載入到遠 ...
  • 一些開源項目包含了各種編程的最佳實踐供人參考學習和借鑒。但是也有一些開源項目雖然初衷是好的。但是包含了一些代碼的壞實踐。特別是對於一部分剛入行的大學生來說,可能會給到一些錯誤的示範。於是在此列舉一些項目中的壞實踐。 1.方法的用意判斷是與否卻返回字元串的“0”或者“1” 如果一個方法明確返回是與否這 ...
  • 體驗方法引用 通過方法引用,來使用已經存在的方案。 定義一個介面,裡面有一個抽象方法: public interface Printable { void printString(String s); } 定義一個測試類: public class PrintDemo { public static ...
  • IHostedService和BackgroundService 前言 平時寫代碼的時候,大家多多少少都會遇到定時任務的要求,今天介紹兩種定時任務的寫法,嚴格來說其實是一種。 相對來說比較粗糙,請多多指教。 方法一 IHostedService 簡介 ######## IHostedService ...
  • 擴展方法名:Filter 支持參數:實體類、JObject 擴展代碼: //白色風車 public static class EntityFrameworkCoreExtensions { private static DbCommand CreateCommand(DatabaseFacade f ...
  • 主題 1 The Shell 課程概覽與 shell · the missing semester of your cs education (missing-semester-cn.github.io) Shell是什麼? 一旦你想脫離可視化界面讓你做的,然後做點別的事情,那麼Shell將是你和計 ...
  • -- 題圖:蘇州天平山楓葉 現在是 2022 年末,痞子衡又要起筆博文年終總結了,看著 2020 年之前的博文總結缺失,始終覺得缺憾,所以寫下此篇 2016 - 2019 總結合輯。2016 年之前,痞子衡也發表過一些文章,不過沒有持續性,那時候更多是以個人筆記形式留在硬碟里。2016 年是痞子衡正 ...
  • Shell變數 變數是任何一種編程語言都必不可少的組成部分,變數用來存放各種數據。腳本語言在定義變數時通常不需要指明類型,直接賦值就可以,Shell 變數也遵循這個規則。 在 Bash shell 中,每一個變數的值都是字元串,無論你給變數賦值時有沒有使用引號,值都會以字元串的形式存儲。 這意味著, ...
一周排行
    -Advertisement-
    Play Games
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...
  • 目錄前言PostgreSql安裝測試額外Nuget安裝Person.cs模擬運行Navicate連postgresql解決方案Garnet為什麼要選擇Garnet而不是RedisRedis不再開源Windows版的Redis是由微軟維護的Windows Redis版本老舊,後續可能不再更新Garne ...
  • C#TMS系統代碼-聯表報表學習 領導被裁了之後很快就有人上任了,幾乎是無縫銜接,很難讓我不想到這早就決定好了。我的職責沒有任何變化。感受下來這個系統封裝程度很高,我只要會調用方法就行。這個系統交付之後不會有太多問題,更多應該是做小需求,有大的開發任務應該也是第二期的事,嗯?怎麼感覺我變成運維了?而 ...
  • 我在隨筆《EAV模型(實體-屬性-值)的設計和低代碼的處理方案(1)》中介紹了一些基本的EAV模型設計知識和基於Winform場景下低代碼(或者說無代碼)的一些實現思路,在本篇隨筆中,我們來分析一下這種針對通用業務,且只需定義就能構建業務模塊存儲和界面的解決方案,其中的數據查詢處理的操作。 ...
  • 對某個遠程伺服器啟用和設置NTP服務(Windows系統) 打開註冊表 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer 將 Enabled 的值設置為 1,這將啟用NTP伺服器功 ...
  • title: Django信號與擴展:深入理解與實踐 date: 2024/5/15 22:40:52 updated: 2024/5/15 22:40:52 categories: 後端開發 tags: Django 信號 松耦合 觀察者 擴展 安全 性能 第一部分:Django信號基礎 Djan ...
  • 使用xadmin2遇到的問題&解決 環境配置: 使用的模塊版本: 關聯的包 Django 3.2.15 mysqlclient 2.2.4 xadmin 2.0.1 django-crispy-forms >= 1.6.0 django-import-export >= 0.5.1 django-r ...
  • 今天我打算整點兒不一樣的內容,通過之前學習的TransformerMap和LazyMap鏈,想搞點不一樣的,所以我關註了另外一條鏈DefaultedMap鏈,主要調用鏈為: 調用鏈詳細描述: ObjectInputStream.readObject() DefaultedMap.readObject ...
  • 後端應用級開發者該如何擁抱 AI GC?就是在這樣的一個大的浪潮下,我們的傳統的應用級開發者。我們該如何選擇職業或者是如何去快速轉型,跟上這樣的一個行業的一個浪潮? 0 AI金字塔模型 越往上它的整個難度就是職業機會也好,或者說是整個的這個運作也好,它的難度會越大,然後越往下機會就會越多,所以這是一 ...
  • @Autowired是Spring框架提供的註解,@Resource是Java EE 5規範提供的註解。 @Autowired預設按照類型自動裝配,而@Resource預設按照名稱自動裝配。 @Autowired支持@Qualifier註解來指定裝配哪一個具有相同類型的bean,而@Resourc... ...