4.3萬字詳解PHP+RabbitMQ(AMQP協議、通訊架構、6大模式、交換機隊列消息持久化、死信隊列、延時隊列、消息丟失、重覆消費、消息應答、消息應答、發佈確認、故障轉移、不公平分發、優先順序、等)

来源:https://www.cnblogs.com/phpphp/p/18171862
-Advertisement-
Play Games

理論(後半部分有實操詳解) 哲學思考 易經思維:向各國人講述一種動物叫烏龜,要學很久的各國語言,但是隨手畫一個烏龜,全世界的人都能看得懂。 道家思維:努力沒有用(指勞神費心的機械性重覆、肢体受累、刻意行為),要用心(深度思考、去感悟、透過現象看本質)才有用。 舉例:類似中學做不出來的幾何題的底層原理 ...


理論(後半部分有實操詳解)

哲學思考

  • 易經思維:向各國人講述一種動物叫烏龜,要學很久的各國語言,但是隨手畫一個烏龜,全世界的人都能看得懂。
  • 道家思維:努力沒有用(指勞神費心的機械性重覆、肢体受累、刻意行為),要用心(深度思考、去感悟、透過現象看本質)才有用。
  • 舉例:類似中學做不出來的幾何題的底層原理:不是不知道xx定理或公式(招式),而是不知道畫輔助線的思路(內功)。
  • 總結:萬事萬物、用道家思維思考本質與規律,用易經思維從眾多信息中簡化模型練出來的內功,叫覺悟(內力),而知識僅僅是外功(招式)。

做研發的,可能距離成功一步之遙,別因為一葉障目而放棄。

消息隊列與消息中間件

  • 消息隊列:一種存儲消息的數據結構,消息隊列通常是一種先進先出(FIFO)的數據結構,可以確保消息的順序傳遞。類比Redis的List,先進先出類比lPush和rPop。
  • 消息中間件:是管理消息傳遞的一種組件。功能包含消息隊列。
  • 中間件:中間件是指位於客戶端(或調用端)和服務端之間的組件,用於協調、存儲、介面、管理組件之間的通信,類比賣家與買家之間的快遞驛站,不是一定要用,但是最好得有。

AMQP協議

AMQP(Advanced Message Queuing Protocol),是一種用於消息傳遞協議,類比成HTTP、TCP、UDP這種不同的協議就行。它定義了消息中間件客戶端與服務端(買家買家對驛站的溝通)的通信規則(怎麼運快遞),包括消息格式(什麼類型的快遞)、消息發佈(怎麼發快遞)、消息訂閱(怎麼收快遞)、隊列管理(怎麼處理快遞)等。

AMQP高效就高效在把通信(物流階段)能遇到的問題都解決了。

  • 非同步通信:AMQP支持非同步通信,可以讓消息發送者和接收者在不同的時間和速度進行消息傳遞。
  • 消息路由:AMQP協議定義了靈活的消息路由規則,可以根據消息內容自動將消息路由到指定的接收者。
  • 消息確認:AMQP支持消息確認機制,確保消息被可靠地接收。
  • 批量處理:AMQP協議支持批量消息處理,可以同時發送和接收多個消息,減少了網路通信的開銷和系統的資源消耗。

RabbitMQ

  • 官方文檔:https://www.rabbitmq.com/docs
  • 極簡概括:使用ErLang語言寫基於AMQP協議的C/S架構的消息中間件,用於不同組件之間高效的傳遞數據。
  • 解決問題:
    • 削峰:賣家一家伙運了幾十噸的貨(海量請求)、買家沒地方存、扛不住了,那就放驛站緩衝一下。
    • 解耦:沒有驛站中轉,快遞送過來硬塞給買家,買家不在,這個流程就走不下去(耦合)。
    • 非同步:賣家只要發貨、流程基本走完,剩下的流程交給物流和驛站(中間件),不影響賣家做其它事(非阻塞),買家也一樣、快遞放驛站、我忙我的。需要時再取(按需訂閱)。
    • 捲:面試加分項,工作用到減輕負擔。只想捲死各位,或者被各位捲死。
  • 適用場景
    • 流量削峰:當系統抗不出海量的請求的時候,把MQ放置在用戶端與業務端之間(強制排隊)削去部分峰值流量(Nginx令牌桶和滴水演算法、或基於Redis也能實現),這個過程和加鎖的缺點差不多,性能會受影響。
    • 應用解耦:小型項目用不上,大型項目中,庫存、訂單、支付、物流等各種模塊,為了防止硬關聯耦合度大,一節點掛掉其餘癱瘓,所以用MQ作為通信的橋梁。
    • 非同步處理:用的最多,非同步發簡訊,發郵件、導出導出文件、延時任務、自動取消訂單、推送通知、回調重試等。
    • 助力跳槽漲薪:只想捲死各位,或者被各位捲死。
    • 賦能成就感:RabbitMQ都用上了,感覺離架構師不遠了(呵呵)。
  • 優點:
    • 應用場景就是優點,主流。
    • 豐富的客戶端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。
  • 缺點:
    • 性能下降:流量削峰引起的性能下降。
    • 運維成本:多引入一個組件,就要考慮它的運維成本,以及各項配置問題。
    • 大數據處理:對於大數據、流數據、日誌分析、更適合Kafka,RabbitMQ性能會下降。
    • 安裝困難:成熟的軟體安裝不方便,Erlang環境依賴強,其次是官網還把CentOS7的安裝去掉了。
  • 同類產品:RocketMQ(僅支持Java和C++)、ActiveMQ(後期停止維護)、Kafka(大數據場景)、基於Redis實現的任務隊列(輕量級使用)、編程語言框架提供的支持(Laravel Queue)、AWSMQ(亞馬遜)、ApsaraMQ(阿裡巴巴)、PulsarMQ(Apache)。

工作架構圖

在這裡插入圖片描述

  • Producer:生產者,類比賣家。
  • Connection:客戶端與服務端的通信層。
  • Channel:這裡叫通道,類比要發那個快遞,或從那家快遞取貨。
  • Broker:用於接受和分發消息的處理,這是MQ Server要乾的事,類比驛站。
  • Exchange:交換機(不是switch),指定規則路由到那個隊列,類比分配到那個貨架的方法。
  • Consumer:消費者,類比買家。
  • 補充:每次訪問RabbitMQ都需要建立一個連接,海量的請求對於這塊的開銷是絕大的,所以需要在channel與connection內部建立邏輯連接,從而減少性能損耗。
  • 多租戶設計:每個Block裡面可以多個Vhost,Vhost裡面,可以有多個Exchange(交換機),每個Exchange可以有多個Queue。

四大概念的通俗理解

  • 生產者:顧名思義生產數據的角色,類比賣家發快遞。
  • 消費者:生產者產出的數據,給消費者使用,類比買家收快遞。
  • 交換機:,用於接受生產者的消息,通過指定模式(4大模式)和路由鍵(交換機與隊列綁定標識符)分發給隊列,一個交換機可綁定多個隊列,將消息路由到多個隊列,類比快遞驛站的分發到那個貨架。
  • 隊列:一種數據結構,存放消息隊列,類比快遞驛站的貨架。
  • 流程:生產者(發快遞)->交換機(驛站怎麼分類)->隊列(驛站怎麼存)->消費者(拿快遞)。

Exchange(交換機)

  • 極簡概括:位於生產者和隊列之間,用於接受生產者的消息,並分發給隊列,一個交換機可綁定多個隊列,將消息路由到多個隊列。
  • 解決問題:生產者發一條消息,讓所有或指定消費者能夠收到(類似廣播)。如果沒有交換機機制,只會有一個消費者能收到此消息。
  • 交換機4大類型:直接(direct)類型、主題(topic)類型、頭(headers)類型、扇出(fanout)類型(下文有詳解)。
  • 補充:在常規模式,工作隊列模式(生產者端,basic_publish方法參數2為空字元串),也有一個預設類型交換機,或稱之為無名Exchange。

Routing Key(路由鍵)

  • 極簡概括:交換機綁定隊列的標識符,一個交換機,可以有多個Routing key。
  • 解決問題:起個名用於區分,方便對不同隊列進行不同的操作,就像MySQL表id作用一樣。

死信

無法被消費的消息。

死信隊列

  • 極簡概括:隊列中的消息無法被消費,若沒有後續的處理,就成了死信隊列。
  • 解決問題:將消費異常的數據放入死信隊列,用於存儲消費失敗或者異常時的情況,確保失敗的消息能夠得到適當的處理(重試或由開發者調試查看用)。可以簡單的理解為找個地方存失敗的消費任務。也可將計就計,利用某些特性作為延時隊列使用。
  • 產生原因:
    • 消息TTL過期。
    • 隊列滿了。
    • 消息被拒絕(basic reject 或basic nack),並且requeue為false。

有Redis List去實現消息隊列,為什麼要RabbitMQ?

  • 持久化問題:RabbitMQ支持持久化,Redis雖然也支持持久化,但只要不是每次操作都持久化,那麼就有丟失數據的風險。
  • 消息應答問題:消息處理成功與失敗,Redis用隊列無法記錄,任務消息只會取一個少一個,而RabbitMQ可以。
  • 故障轉移問題:Redis哨兵機制、主從複製,是針對緩存高可用,做消息中間件有局限性。RabbitMQ支持消息重新入隊。如果某個消費者由於某些原因失去連接,導致消息未發送ACK確認,那麼RabbitMQ有讓消息重新排隊的機制,如果此時其它消費者可以處理,那就讓其它消費者處理。
  • 支持消息優先順序:RabbitMQ支持消息優先順序,而Redis不支持。
  • 廣播支持:RabbitMQ支持廣播,等指定隊列發送,而Redis不支持。
  • 路由轉發:RabbitMQ通過交換機機制,支持設定不同的分發隊列規則,滿足各個場景,而Redis List需要手動實現這塊內部機制。

有Redis Sorted Set、或者過期監聽去實現延時隊列,為什麼要RabbitMQ?

RabbitMQ是推模式還是拉模式?

都有,生產者發數據到MQ是推,消費者消費消息是拉。

通信方案選擇Push還是Pull?

推或拉是兩種通信的方向選擇,跟MQ無關,但是類似MQ,順便提一下。
個人認為:

  • 看業務場景,拋開業務場景談架構都是耍流氓
    • 要求實時性的,就選擇推送,輪詢耗費網路資源,調用端或客戶端每次請求,服務端都得執行一次,尤其是併發量大或者響應流程任務重的場景。
    • 不需要實時性的,就拉取,減低耦合,服務端就純粹的產生服務端的數據就行,客戶端或調用端誰想拉取讓它自己來。
  • 看改動開銷
    • 保證工程質量的前提下,那種方式開銷小,技術老大說用哪個,或者同事們習慣那一種,就用哪一種。
  • 看調用端可信任性。
    • 各種鑒權,驗證是一方面,數據傳輸也是一方面,對於不信任的平臺,白推送的數據,與對方直接請求獲取。還是有區別的。

RabbitMQ可以直連隊列嗎?

RabbitMQ內部不可以直連隊列,但是操作上可以直連隊列。
就算是常規(Hello World)模式,沒有聲明交換機,也會經過一個預設交換機。
不過這樣喪失了交換機靈活的路由分發功能,適用於簡單的場景。

實操

安裝

Docker安裝

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
瀏覽器訪問:http://192.168.xxx.xxx:15672

普通安裝

CentOS7的安裝RabbitMQ的教程,已經被官網刪除了,支持CentOS8,CentOS需要藉助外力。

安裝Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang

安裝RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server

開啟服務,並設置開機自啟
systemctl start rabbitmq-server
systemctl enable rabbitmq-server

檢查狀態
systemctl status rabbitmq-server

啟動網頁端控制台
rabbitmq-plugins enable rabbitmq_management

開啟防火牆
firewall-cmd --zone=public --add-port=80/tcp --permanent
systemctl restart firewalld

新建網頁端登錄用戶,並配置角色與許可權
rabbitmqctl add_user admin 12345678
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
-p <virtual_host>:指定虛擬主機的名稱,例如/(預設虛擬主機)。
<username>:要為其設置許可權的用戶名。
<configure_permission>:配置許可權的正則表達式。允許用戶對隊列、交換機等進行配置。
<write_permission>:寫許可權的正則表達式。允許用戶發佈消息。
<read_permission>:讀許可權的正則表達式。允許用戶獲取消息。

瀏覽器訪問
http://192.168.xxx.xxx:15672
用戶名admin,密碼12345678

命令行常用命令

systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server開啟、關停、重啟、狀態查看、開機自啟

rabbitmq-plugins enable 插件名          # RabbitMQ Server安裝插件
rabbitmq-plugins list                   # 插件列表

rabbitmqctl version                     # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges              # 查看交換機列表
rabbitmqctl list_queues                 # 查看隊列列表
rabbitmqctl list_bindings               # 查看綁定列表

PHP實現RabbitMQ Client

  • RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
  • 官方擴展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
    這個擴展官方下載地址有最後一版Windows系統的php_amqp.dll的下載地址,(用Windows是為了方便,在CentOS上還需要編譯,改完PHP代碼每次需要重新上傳,不想費事),但是我使用報錯,所以廢棄了。
  • 官方推薦:composer require php-amqplib/php-amqplib
    PHP操作RabbitMQ思路並不複雜,有6種工作模式,翻來覆去就是研究消息怎麼發,發到哪裡,怎麼處理消息的問題。
  • Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用現成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq

消費者常駐進程偏好

while (count($channel->callbacks)) {
    $channel->wait();
}
//或者
$channel->consume();

常規模式(Hello World!)

  • 極簡概括:生產者生產消息給消費者。
  • 解決問題:跨進程,或跨組件、跨網路通信,適用與兩個角色,但是一個PHP進程無法完成的時候用。
  • 備註:用MySQL、Redis也能實現。
  • 生產者端代碼:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化通道
$channel = $connection->channel();

/*
參數1:隊列名
參數2:在聲明隊列時指定是否啟用passively模式,passively模式用於檢查隊列是否存在,而不是實際創建一個新隊列。如果隊列不存在,則會返回一個通知,而不會創建新隊列。
參數3:指定隊列的持久性。在這裡,它是false,表示隊列不是持久的。如果設置為true,則隊列將在伺服器重啟時或宕機後保留下來。 
參數4:指定隊列的排他性。如果設置為 true,則該隊列只能被聲明它的連接使用,一般用於臨時隊列。false表示隊列不是排它的。
參數5:指定隊列的自動刪除,如果設置為 true,則在隊列不再被使用時將自動刪除。在這裡,它是 false,表示隊列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);

//編輯消息
$msg = new AMQPMessage('Hello World!');
//發送消息,交換機用不上,所以留空。這方法沒有返回值
$channel->basic_publish($msg, '', 'hello');


//用完了就關閉,釋放資源
$channel->close();
$connection->close();
  • 消費者端代碼
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化通道
$channel = $connection->channel();

/*
參數1:隊列名
參數2:在聲明隊列時指定是否啟用passively模式,passively模式用於檢查隊列是否存在,而不是實際創建一個新隊列。如果隊列不存在,則會返回一個通知,而不會創建新隊列。
參數3:指定隊列的持久性。在這裡,它是false,表示隊列不是持久的。如果設置為true,則隊列將在伺服器重啟後保留下來。
參數4:指定隊列的排他性。如果設置為 true,則該隊列只能被聲明它的連接使用,一般用於臨時隊列。false表示隊列不是排它的。
參數5:指定隊列的自動刪除,如果設置為 true,則在隊列不再被使用時將自動刪除。在這裡,它是 false,表示隊列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);


/*
參數1:隊列名稱
參數2:這是消費者標簽(consumer tag),用於唯一標識消費者。在這裡,它是空字元串,表示不為消費者指定任何特定的標簽。
參數3:如果設置了無本地欄位,則伺服器將不會向發佈消息的連接發送消息。
參數4:是指定是否自動確認消息(auto-ack)。設置為true則表示消費者在接收到消息後會立即確認消息,RabbitMQ將會將消息標記為已處理並從隊列中刪除。false表示消費者會手動確認消息,即在處理消息後,通過調用 $channel->basic_ack($deliveryTag) 手動確認消息。
參數5:指定是否獨占消費者。如果設置為true,則表示只有當前連接能夠使用該消費者。在這裡,它是true,表示只有當前連接可以使用這個消費者。
參數6:如果設置了,伺服器將不會對該方法作出響應。客戶端不應等待答覆方法。如果伺服器無法完成該方法,它將引發通道或連接異常。
參數7:回調參數,拿到數據怎樣處理。
*/
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
    echo $msg->body;
});

//通過死迴圈持久化當前進程,實時消費
$channel->consume();

工作隊列模式(Work Queues)

  • 極簡概括:類比Redis的lPush、Rpop。但是RabbitMQ可以針對一個隊列有多個消費者,但一條消息,只能被一個消費者消費一次,不能多次消費。為了避免消費者處理數據傾斜問題(有的隊列處理任務多,有的處理的少),所以使用了輪詢的方式,挨個處理任務。
  • 解決問題:耗時且不需要串列執行的任務,可以丟給隊列,例如發簡訊、郵件、大量數據導入導出。
  • 測試普通用法

生產者代碼,在cli模式下,依次輸入1~10,執行10次

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');

$channel->close();
$connection->close();

消費者1代碼,cli模式下運行,依次返回1、3、5、7、9,可見RabbitMQ不管消費節點處理的時間,只會根據消費者數量輪詢處理,哪怕其中任意幾個隊列任務重,其它隊列任務輕鬆。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, true, false, false,
    function ($msg) {
        echo "收到消息,內容為{$msg->getBody()}\n";
        sleep(5);
        echo "成功處理消息\n";
    }
);

$channel->consume();

消費者2代碼,(與消費者1代碼唯一不同的,就是sleep函數的時間),cli模式下運行,依次返回2、4、6、8、10,可見RabbitMQ不管消費節點處理的時間,只會根據消費者數量輪詢處理,哪怕其中任意幾個隊列任務重,其它隊列任務輕鬆。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->basic_consume('hello', '', false, true, false, false,
    function ($msg) {
        echo "收到消息,內容為{$msg->getBody()}\n";
        sleep(10);
        echo "成功處理消息\n";
    }
);

$channel->consume();

發佈訂閱模式(Pub/Sub)

  • 極簡概括:生產者生產數據,所有隊列關聯的消費者都接收(類似廣播),使用交換機的扇出(Fanout)模式實現。
  • 解決問題:聊天室的功能,可以由它實現。

生產者代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
/**
參數1:交換機名稱。
參數2:交換機類型,這裡是扇出。
參數3:當passive參數設置為true時,表示不會實際創建新的交換機或隊列,而是用來檢查已經存在的交換機或隊列是否已經存在。如果存在,則返回成功,如果不存在,則返回失敗。passive參數主要用於檢查交換機或隊列是否存在,而不是實際創建新的實體
參數4:交換機是否持久化,即當RabbitMQ伺服器重啟時,交換機會不會被重新創建。
參數5:當所有綁定的隊列都與交換機解綁後,是否自動刪除交換機。
*/
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);


$msg = new AMQPMessage('扇出測試');
//發送給指定的交換機
$channel->basic_publish($msg, 'fanout_test');


$channel->close();
$connection->close();

消費者1代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機
$channel->queue_bind($queue_name, 'fanout_test');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

消費者2代碼(同1):

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機
$channel->queue_bind($queue_name, 'fanout_test');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

路由模式(Routing)

  • 極簡概括:通過路由鍵,將消息發送給指定的(任意數量)消費者,一個消費者可配置多個路由鍵。
  • 解決問題:只發送部分消費者。

生產端代碼,只讓消費者1消費。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('direct_test', 'direct', false, false, false);


$msg = new AMQPMessage('扇出測試');
//發送給指定的交換機,並指定路由鍵
$channel->basic_publish($msg, 'direct_test', 'consumer1');


$channel->close();
$connection->close();

消費者1代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機,並聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer1');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

消費者2代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機,並聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer2');

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

主題模式(Topics)

  • 極簡概括:生產者獲取消息,編寫指定的匹配規則,使被匹配的隊列的消費者能夠消費消息。
  • 解決問題:實現更加靈活的,消息分發模式。
  • 備註:主題模式下的路由鍵,多個標識用多個點將其分開(例如aaa.bbb.ccc),最長不超過255個字元。支持通配符,*匹配一個標識(奇葩設計,和#就不應該反過來嗎),#匹配任意個標識。
    通配符用法*.*.xxx,表示xxx結尾的路由鍵,*.xxx.*表示包含xxx的路由鍵。a.#可以匹配a,也可以匹配,a.b。
    對應的,當一個隊列綁定的鍵是#,那就類似於fanout模式,如果一個隊列中沒有任何通配符,那就類似於direct模式。

生產者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明隊列類型為主題
$channel->exchange_declare('topic_test', 'topic', false, false, false);

$msg = new AMQPMessage('topic測試數據');


/*
以下路由鍵可以接受到消息
a.b
a.*.*
a.*.*.*
#.z
z
a.x.y.z
abc.z
*/
$arr = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad','a.x.y.z', 'abc.z'];

foreach($arr as $v) {
    $channel->basic_publish($msg, 'topic_logs', $v);
}

$channel->close();
$connection->close();

消費者代碼

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明topic模式的交換機
$channel->exchange_declare('topic_test', 'topic', false, false, false);
//創建臨時隊列
$queue_name = $channel->queue_declare("", false, false, true, false)[0];

$binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z'];
//綁定多個路由鍵
foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

$callback = function ($msg) {
    echo 'RoutingKey:', $msg->getRoutingKey(), ' --- Msg:', $msg->getBody(), "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

遠程過程調用模式(RPC)

  • 極簡概括:(對於PHP,RPC用的不多,中間件RPC用的更少)使得客戶端可以像調用本地方法一樣調用遠程方法,同時隱藏了底層網路通信細節。

調用端代碼

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RpcClient {
    private $connection;
    private $channel;
    private $queue_name;
    private $response;
    private $corr_id;

    public function __construct() {
        $this->connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
        $this->channel = $this->connection->channel();
        $this->queue_name = $this->channel->queue_declare("", false, false, true, false)[0];
        $this->channel->basic_consume($this->queue_name, '', false, true, false, false, array($this, 'onResponse'));
    }

    public function onResponse($rep) {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    public function call($str) {
        $this->response = null;
        $this->corr_id = uniqid();


        $this->channel->basic_publish(new AMQPMessage($str, [
            'correlation_id' => $this->corr_id,
            'reply_to' => $this->queue_name
        ]), '', 'rpc_queue');

        while (! $this->response) {
            $this->channel->wait();
        }

        return $this->response;
    }
}

$fibonacci_rpc = new RpcClient();
$response = $fibonacci_rpc->call('客戶端向服務端發送數據');
echo $response, "\n";

服務端代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

$callback = function ($req) {
    echo $req->getBody(), "\n";

    $msg = new AMQPMessage('服務端成功接收:'. $req->getBody(), ['correlation_id' => $req->get('correlation_id')]);

    $req->getChannel()->basic_publish($msg, '', $req->get('reply_to'));
    $req->ack();
};

$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

死信隊列(TTL過期)

  • 極簡概括:消息超時未處理,會被放置死信隊列中。
  • 備註:註釋很清楚,這需要先執行這段代碼(創建普通與死信,交換機、隊列、和路由鍵),然後再關閉(模擬消費者掛掉,10秒後,此時超時的消息就會存入死信隊列,至於死信的隊列的數據,是重試,還是給開發者提示,看產品需求)。
  • 現象:流程走完後,登錄控制台,到Queues選項卡,查看隊列列表的Ready列,原先的normal_queue隊列由原先的10變成了0(執行一次生產者代碼,裡面有一個for迴圈),而dead_queue裡面的Ready,由原先的0,變成了10(10個消息全部超時,到了死信隊列)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//設置消息生存時間為15秒
$payload->set('x-message-ttl', 10000);
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);


$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
    echo $msg->getBody(), "\n";
});

//常駐進程
$channel->consume();

$channel->close();
$connection->close();

然後再執行生產者代碼,模擬發消息。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);


for($i = 0; $i< 10; $i++) {
    $msg = new AMQPMessage($i, [
        //配置過期時間為10秒,讓生產者控制過期時間
        'expiration' => '10000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

極簡死信隊列消費示例

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

死信隊列(隊列達到最大長度放不下)

  • 極簡概括:生產的消息數量超過了消費者端設置了最大長度,剩餘的消息會被放入死信隊列。
  • 註意:本次測試,需要把原先的隊列刪除,否則隊列長度設置不生效。
  • 現象:流程走完後,登錄控制台,到Queues選項卡,查看隊列列表的Ready列,原先的normal_queue隊列由原先的0變成了8(執行一次生產者代碼,裡面有一個for迴圈),而dead_queue裡面的Ready,由原先的0,變成了2(10 - 8)。

消費者端代碼,這一步是為了初始化普通和死信交換機、隊列、路由鍵,並且需要執行後Ctrl + C強制停止,保證生產者生產的消息,不被能正常的消費(不然怎麼演示不正常現象時的死信隊列?)。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//設置最多存儲8條消息
$payload->set('x-max-length', 8);
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);



$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
    echo $msg->getBody(), "\n";
});

//常駐進程
$channel->consume();

$channel->close();
$connection->close();

生產者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);


for($i = 0; $i< 10; $i++) {
    $msg = new AMQPMessage($i, [
        //配置過期時間為10秒,讓生產者控制過期時間
        'expiration' => '10000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

極簡死信隊列消費示例

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

死信隊列(消息被拒絕,且requeue為false)

  • 極簡概括:消費者拒絕了消息,並且將requeue為false,的消息,會被放入死信隊列。
  • 註意:本次測試,需要把原先的隊列刪除,避免影響。
  • 現象:流程走完後,登錄控制台,到Queues選項卡,查看隊列列表的Ready列,原先的normal_queue隊列由原先的0變成了0(執行一次生產者代碼,裡面有一個for迴圈),而dead_queue裡面的Ready,由原先的0,變成了3(10 - 7,消費者的代碼中,有if判斷)。

這次讓消費者正常消費代碼即可,不用Ctrl + C強制中斷,正常接受生產者者的數據。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);



$channel->basic_consume($normal_queue, '', false, false, false, false, function($msg) {
    if($msg->getBody() > 6)  {
        //手動拒絕消息,不批量,且不讓重入隊列
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, false);
    } else {
        //手動確認消息
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
});

//常駐進程
$channel->consume();

$channel->close();
$connection->close();

生產者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);


for($i = 0; $i< 10; $i++) {
    $msg = new AMQPMessage($i, [
        //配置過期時間為10秒,讓生產者控制過期時間
        'expiration' => '10000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

極簡死信隊列消費示例

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

延時隊列(基於TTL過期式的死信隊列,不推薦使用)

  • 極簡概括:生產出來的隊列任務,不會立馬消費,而是等到指定時間。
  • 解決問題:自動確認訂單,自動取消未支付訂單,等。
  • 原理分析:常規的生產和消息鏈路配置了死信隊列的兜底機制,若普通隊列因TTL過期,會自動把消息放入死信隊列,利用這個特性,可以做出來延遲隊列,延時隊列的延遲機制,就是普通隊列的TTL過期時間,延時任務的處理機制,就是消費死信隊列的代碼段。因此可以專門選一個死信隊列,作為延時隊列來用。
  • 註意:
    • 例如延遲10秒,並不能保證一定會在第10秒被處理,會有小誤差(絕大部分業務場景能接受)。
    • 延遲消息是排隊處理的,第一個延遲10秒,第二個消息延遲5秒,預設情況下,第二個會延遲15秒,因為消息是排隊(隊列先進先出)處理的,所以這個情況需要優化,這是RabbitMQ死信隊列實現延時隊列的巨大缺陷。
      擴展:利用Redis的sorted set也可以作為延時隊列,key為唯一標識符,score為執行的時間的時間戳,val為要執行的序列化代碼,用while(true)起一個常駐進程,用於消費當前時間戳下的任務,如果要添加任務,就網Zset中添加數據。常駐進程到時間了,就會消費。

初始化普通、死信交換機、隊列、路由鍵。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

//聲明普通交換機名稱
$normal_exchange    = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange      = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue       = 'normal_queue';
//聲明死信隊列名稱
$dead_queue         = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key   = 'dead_routing_key';


//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);


//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);


//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';


//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);

$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {

});

生產者代碼,產生延時任務

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('normal_exchange', 'direct', false, false, true);

for($i = 0;$i < 10; $i++) {
    $msg = new AMQPMessage(microtime(true), [
        'expiration' => '3000'
    ]);

    $channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}


$channel->close();
$connection->close();

延時任務處理代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    $consumer_time = microtime(true);
    $product_time  = $msg->getBody();
    echo '時間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

延時隊列(基於RabbitMQ延遲交換機插件,推薦使用)

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
發現如下字樣,就說明安裝成功。
Enabling plugins on node rabbit@lnmp:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@lnmp...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.


systemctl restart rabbitmq-server

生產者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$delay_exchange    = 'delay_exchange';
$delay_queue       = 'delay_queue';
$delay_routing_key = 'delay_routing_key';


//聲明交換機類型為direct
$payload = new AMQPTable();
$payload->set('x-delayed-type', 'direct');
//聲明延時隊列交換機,參數2的類型,是安裝插件後才有的,固定值
$channel->exchange_declare($delay_exchange, 'x-delayed-message', false, false, true, false, false, $payload);

//聲明一個自定義延遲隊列
$channel->queue_declare($delay_queue, false, false, false, false);
//隊列綁定交換機
$channel->queue_bind($delay_queue, $delay_exchange, $delay_routing_key);

//發送延遲消息
$msg = new AMQPMessage(microtime(true), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    //這裡配置超時時間,固定格式。
    'application_headers' => new AMQPTable(['x-delay' => 5000])
]);

$channel->basic_publish($msg, $delay_exchange, $delay_routing_key);


$channel->close();
$connection->close();

消費者代碼

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    $consumer_time = microtime(true);
    $product_time  = $msg->getBody();
    echo '時間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('delay_queue', '', false, true, false, false, $callback);

$channel->consume();


$channel->close();
$connection->close();

延時隊列提前執行或延後執行的方案

某些業務場景,可能需要提前觸發,或者延期處理,這就需要一些外的操作,才能完成。

隊列優先順序

  • 不同的隊列和可以設置不同的優先順序。
  • 在堆積的情況下才生效,否則生產一個消息處理一個,無阻塞,就不存在多個消息的優先處理的問題。
  • 要設置優先順序,那麼隊列和消息都需要設置優先順序。
  • 優先順序越大,越先被處理。
  • 支持1到255之間的優先順序,強烈建議使用1到5之間的值,更高的優先順序值需要更多的CPU和記憶體資源,因為RabbitMQ需要在內部為每個優先順序維護一個子隊列,從1到為給定隊列配置的最大值。

首先啟動生產者代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$test_exchange    = 'test_exchange';
$test_queue       = 'test_queue';
$test_routing_key = 'test_routing_key';

$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);

$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個自定義延遲隊列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊列綁定交換機
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);

for($i =0; $i < 10; $i++) {
    $msg = new AMQPMessage($i, [
        //為每個消息設置不同的優先順序
        'priority' => $i,
    ]);

    $channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}

$channel->close();
$connection->close();

消費者代碼:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$callback = function ($msg) {
    echo $msg . "\n";
};
$channel->basic_consume('test_queue', '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

惰性隊列

惰性隊列用的極少,因為是存儲在磁碟中的。在消費者掛掉的情況下,避免RabbitMQ積累了太多的消息,消耗記憶體去採取的策略。

高可用

性能與高可用權衡的架構思維

加了RabbitMQ組件可以看做是分散式系統,分散式系統有有一個CAP定理,CAP只能同時滿足兩個。
對於RabbitMQ消息丟失,或者重覆消費的問題,若當前需求不能容忍,就需要額外的環節來彌補。如果當前需求(小概率)能容忍,去掉高可用環節,那麼在性能上就會提升。
例如支付寶,支付時總是會延遲幾秒鐘,一是走支付寶安全風控系統,二是要極致要求穩定高可用的環節必然性能下降,三是等支付回調,然而其它介面內容卻是瞬間載入。
所以沒有最好的架構,只有最合適的架構,所以要結合業務場景,判斷業務容忍度下限,與開發複雜度,在高可用與性能之間權衡,這是架構師必備思維。

如何避免RabbitMQ消息丟失?

  • 百分99.9%投遞策略:事先要處理的消息綁一個唯一標識(讓RabbitMQ和MySQL自始至終都能知道是哪一個,避免因業務需求生產兩份同樣的消息區分不出來),先寫入MySQL,並預定一個未消費的預設狀態,直到收到RabbitMQ的確認通知,改變狀態。再加一個迴圈定時任務(定時的周期看業務需求容忍度與組件性能),超時未處理的消息,讓定時任務手動觸發。
    但這種也不是沒有弊端,如果RabbitMQ的確認通知由於網路抖動沒有發出去,那麼定時任務就會讓其重覆消費。
  • 生產消息傳輸感知:RabbitMQ掛了,生產消息發送給MQ時,就會超時,報錯,此時把消息放入MySQL做兜底(可以臨時不處理,但是別丟),避免消息丟失。
  • RabbitMQ自帶的交換機、隊列、消息持久化機制。
  • RabbitMQ自帶的發送確認、消費確認、事務隊列機制。

如何防止RabbitMQ重覆消費?

  • 兜底策略:在數據集層上最好有個兜底策略,如唯一索引,更新數據前的狀態機判斷等,避免由於程式設計疏忽或者故障等原因導致的重覆消費。
  • 上游生產防重:消息重覆消費,一方面是消息重覆發送引起的,一般在介面處理的上游,會有一些防重策略(數據冪等),上游杜絕,也是一種策略。
  • 消費端上游判斷:首先把要處理的消息綁一個唯一標識(讓RabbitMQ和MySQL自始至終都能知道是哪一個,避免因業務需求生產兩份同樣的消息區分不出來),消費者獲取到消息時,再把唯一標識寫入具有唯一索引約束的MySQL表,重覆消費,MySQL唯一索引就要報錯,利用插入失敗的特性,阻止重覆消費。

RabbitMQ不公平分發

  • 不公平分發:推薦操作,避免一個消費者很忙,其它消費者不做任何工作的情況發生,防止壓力傾斜。
    可在每個消費者端調用basic_consume()方法之前配置,註意這種行為在消息堆積的情況下生效
    被設置不公平分發的消費者中,可從網頁端控制台->Channels選項卡->表格->所在行數據的Prefetch中看到。
/*
參數1:預取大小,通常情況下為null,表示不限制消息大小。
參數2:預取數量,表示消費者每次最多接收的消息數量,值為權重比例,可以為任意正整數。
參數3:是否將預取限制應用到channel級別(true)或者消費者級別(false)。
*/
$channel->basic_qos(null, 1, false);

RabbitMQ可靠消息機制(發佈確認)

  • 極簡概括:是用來保證消息不丟失的的重要功能,生產者將消息發送給MQ,MQ把數據保存在磁碟上之後,會返回生產者成功保存的反饋。需要生產者端的隊列以及消息持久化的前提,就是為了防止隊列或者小寫將要持久化的時候RabbitMQ出故障的間隙情況發生。
    隊列持久化、消息持久化、發佈確認3個因素加起來,才能保證消息不丟失。
    發佈確認模式有3種:

    • 單個確認:性能最低,生產一條消息確認一個,後面的消息排隊。
    • 批量確認:性能比單個確認好,但是出問題時,無法定位是那條消息。
    • 非同步確認:是性能與高可用性權衡的方案(很多服務端組件都有這樣的),利用回調函數來達到消息可靠性傳遞的。
  • 單個確認,發送100條數據,耗時0.337秒,大部分場景夠用,高併發場景除外。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->queue_declare('hello', false, true, false, false);


//開啟發佈確認模式
$channel->confirm_select();

$start = microtime(true);
for($i = 0; $i < 100; $i++) {
    $msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    $channel->basic_publish($msg, '', 'hello');
    //生產一條數據,發送一條確認消息,參數值為超時時間,單位:秒
    $channel->wait_for_pending_acks(10.000);
}
echo microtime(true) - $start; // 0.337秒

$cha

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 記錄一個HTML頁面關於高分屏的踩到的坑。 所謂高分屏,就是在同樣大小的屏幕面積上顯示更多的像素點,這樣可以呈現更好的可視效果的屏幕。例如,我的筆記本是15.6寸,理論上它的屏幕解析度應該是1920 x 1080像素,但實際上我的筆記本屏幕解析度確實2560 x 1440像素,也就是俗稱的2K屏。這 ...
  • 主題搗鼓日記 sakura版本(YYDS) 主要框架都沒怎麼動,功能挺完整的。但是如果要DIY,我建議還是得自己把代碼捋一遍,不然從哪改起都不知道,註釋有點用但不全。 搗鼓了兩天兩夜,還是有很多細節沒改好,main.js翻了四五遍,看評論區發現諸多細節還要改CSS文件,太難了。。前端都忘得差不多了, ...
  • 一、介紹 Promise,譯為承諾,是非同步編程的一種解決方案,比傳統的解決方案(回調函數)更加合理和更加強大 在以往我們如果處理多層非同步操作,我們往往會像下麵那樣編寫我們的代碼 doSomething(function(result) { doSomethingElse(result, functi ...
  • 寫在前面 tips:點贊 + 收藏 = 學會! 本文包含radash中數組相關的所有方法說明 + 使用示例 + 思維導圖查看 這邊會整理出一份數組相關方法的使用大綱(不含源碼解析),方便大家查閱使用; 作者會按照大類進行整理分享,本次也會同步給出Array所有方法的思維導圖; 所有方法整理完畢後,作 ...
  • 大家好,我前夕.最近,我分享了個人開發的微信讀書網頁插件,這個小項目意外收穫了眾多用戶的喜愛. 這讓我意識到技術作品能跨越專業界限,幫助到有需要的人.受此激勵,我決定對插件進行全面重構與優化 ...
  • 大家好,我是 Java陳序員。 今天,給大家介紹一個開源的聊天應用程式,支持PC端和移動端。 關註微信公眾號:【Java陳序員】,獲取開源項目分享、AI副業分享、超200本經典電腦電子書籍等。 項目介紹 HasChat —— 一個基於 Vue3 + Socket.io 的聊天應用,同時支持PC端和 ...
  • 一、什麼是雙向綁定 我們先從單向綁定切入單向綁定非常簡單,就是把Model綁定到View,當我們用JavaScript代碼更新Model時,View就會自動更新雙向綁定就很容易聯想到了,在單向綁定的基礎上,用戶更新了View,Model的數據也自動被更新了,這種情況就是雙向綁定舉個慄子 當用戶填寫表 ...
  • 透明傳輸,顧名思義,是指在傳輸過程中對外界完全透明,不需要關註傳輸過程以及傳輸協議,最終目的是要把傳輸的內容原封不動地傳遞給接收端,發送和接收的內容完全一致。在無線模塊中,透明傳輸通常是通過特定的技術和機制來實現的。 無線模塊透明傳輸的原理主要基於串口數據的透明傳輸。這種傳輸方式下,無線模塊被配置為 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...