理論(後半部分有實操詳解) 哲學思考 易經思維:向各國人講述一種動物叫烏龜,要學很久的各國語言,但是隨手畫一個烏龜,全世界的人都能看得懂。 道家思維:努力沒有用(指勞神費心的機械性重覆、肢体受累、刻意行為),要用心(深度思考、去感悟、透過現象看本質)才有用。 舉例:類似中學做不出來的幾何題的底層原理 ...
理論(後半部分有實操詳解)
哲學思考
- 易經思維:向各國人講述一種動物叫烏龜,要學很久的各國語言,但是隨手畫一個烏龜,全世界的人都能看得懂。
- 道家思維:努力沒有用(指勞神費心的機械性重覆、肢体受累、刻意行為),要用心(深度思考、去感悟、透過現象看本質)才有用。
- 舉例:類似中學做不出來的幾何題的底層原理:不是不知道xx定理或公式(招式),而是不知道畫輔助線的思路(內功)。
- 總結:萬事萬物、用道家思維思考本質與規律,用易經思維從眾多信息中簡化模型練出來的內功,叫覺悟(內力),而知識僅僅是外功(招式)。
做研發的,可能距離成功一步之遙,別因為一葉障目而放棄。
消息隊列與消息中間件
- 消息隊列:一種存儲消息的數據結構,消息隊列通常是一種先進先出(FIFO)的數據結構,可以確保消息的順序傳遞。類比Redis的List,先進先出類比lPush和rPop。
- 消息中間件:是管理消息傳遞的一種組件。功能包含消息隊列。
- 中間件:中間件是指位於客戶端(或調用端)和服務端之間的組件,用於協調、存儲、介面、管理組件之間的通信,類比賣家與買家之間的快遞驛站,不是一定要用,但是最好得有。
AMQP協議
AMQP(Advanced Message Queuing Protocol),是一種用於消息傳遞協議,類比成HTTP、TCP、UDP這種不同的協議就行。它定義了消息中間件客戶端與服務端(買家買家對驛站的溝通)的通信規則(怎麼運快遞),包括消息格式(什麼類型的快遞)、消息發佈(怎麼發快遞)、消息訂閱(怎麼收快遞)、隊列管理(怎麼處理快遞)等。
AMQP高效就高效在把通信(物流階段)能遇到的問題都解決了。
- 非同步通信:AMQP支持非同步通信,可以讓消息發送者和接收者在不同的時間和速度進行消息傳遞。
- 消息路由:AMQP協議定義了靈活的消息路由規則,可以根據消息內容自動將消息路由到指定的接收者。
- 消息確認:AMQP支持消息確認機制,確保消息被可靠地接收。
- 批量處理:AMQP協議支持批量消息處理,可以同時發送和接收多個消息,減少了網路通信的開銷和系統的資源消耗。
RabbitMQ
- 官方文檔:https://www.rabbitmq.com/docs
- 極簡概括:使用ErLang語言寫基於AMQP協議的C/S架構的消息中間件,用於不同組件之間高效的傳遞數據。
- 解決問題:
- 削峰:賣家一家伙運了幾十噸的貨(海量請求)、買家沒地方存、扛不住了,那就放驛站緩衝一下。
- 解耦:沒有驛站中轉,快遞送過來硬塞給買家,買家不在,這個流程就走不下去(耦合)。
- 非同步:賣家只要發貨、流程基本走完,剩下的流程交給物流和驛站(中間件),不影響賣家做其它事(非阻塞),買家也一樣、快遞放驛站、我忙我的。需要時再取(按需訂閱)。
- 捲:面試加分項,工作用到減輕負擔。只想捲死各位,或者被各位捲死。
- 適用場景
- 流量削峰:當系統抗不出海量的請求的時候,把MQ放置在用戶端與業務端之間(強制排隊)削去部分峰值流量(Nginx令牌桶和滴水演算法、或基於Redis也能實現),這個過程和加鎖的缺點差不多,性能會受影響。
- 應用解耦:小型項目用不上,大型項目中,庫存、訂單、支付、物流等各種模塊,為了防止硬關聯耦合度大,一節點掛掉其餘癱瘓,所以用MQ作為通信的橋梁。
- 非同步處理:用的最多,非同步發簡訊,發郵件、導出導出文件、延時任務、自動取消訂單、推送通知、回調重試等。
- 助力跳槽漲薪:只想捲死各位,或者被各位捲死。
- 賦能成就感:RabbitMQ都用上了,感覺離架構師不遠了(呵呵)。
- 優點:
- 應用場景就是優點,主流。
- 豐富的客戶端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。
- 缺點:
- 性能下降:流量削峰引起的性能下降。
- 運維成本:多引入一個組件,就要考慮它的運維成本,以及各項配置問題。
- 大數據處理:對於大數據、流數據、日誌分析、更適合Kafka,RabbitMQ性能會下降。
- 安裝困難:成熟的軟體安裝不方便,Erlang環境依賴強,其次是官網還把CentOS7的安裝去掉了。
- 同類產品:RocketMQ(僅支持Java和C++)、ActiveMQ(後期停止維護)、Kafka(大數據場景)、基於Redis實現的任務隊列(輕量級使用)、編程語言框架提供的支持(Laravel Queue)、AWSMQ(亞馬遜)、ApsaraMQ(阿裡巴巴)、PulsarMQ(Apache)。
工作架構圖
- Producer:生產者,類比賣家。
- Connection:客戶端與服務端的通信層。
- Channel:這裡叫通道,類比要發那個快遞,或從那家快遞取貨。
- Broker:用於接受和分發消息的處理,這是MQ Server要乾的事,類比驛站。
- Exchange:交換機(不是switch),指定規則路由到那個隊列,類比分配到那個貨架的方法。
- Consumer:消費者,類比買家。
- 補充:每次訪問RabbitMQ都需要建立一個連接,海量的請求對於這塊的開銷是絕大的,所以需要在channel與connection內部建立邏輯連接,從而減少性能損耗。
- 多租戶設計:每個Block裡面可以多個Vhost,Vhost裡面,可以有多個Exchange(交換機),每個Exchange可以有多個Queue。
四大概念的通俗理解
- 生產者:顧名思義生產數據的角色,類比賣家發快遞。
- 消費者:生產者產出的數據,給消費者使用,類比買家收快遞。
- 交換機:,用於接受生產者的消息,通過指定模式(4大模式)和路由鍵(交換機與隊列綁定標識符)分發給隊列,一個交換機可綁定多個隊列,將消息路由到多個隊列,類比快遞驛站的分發到那個貨架。
- 隊列:一種數據結構,存放消息隊列,類比快遞驛站的貨架。
- 流程:生產者(發快遞)->交換機(驛站怎麼分類)->隊列(驛站怎麼存)->消費者(拿快遞)。
Exchange(交換機)
- 極簡概括:位於生產者和隊列之間,用於接受生產者的消息,並分發給隊列,一個交換機可綁定多個隊列,將消息路由到多個隊列。
- 解決問題:生產者發一條消息,讓所有或指定消費者能夠收到(類似廣播)。如果沒有交換機機制,只會有一個消費者能收到此消息。
- 交換機4大類型:直接(direct)類型、主題(topic)類型、頭(headers)類型、扇出(fanout)類型(下文有詳解)。
- 補充:在常規模式,工作隊列模式(生產者端,basic_publish方法參數2為空字元串),也有一個預設類型交換機,或稱之為無名Exchange。
Routing Key(路由鍵)
- 極簡概括:交換機綁定隊列的標識符,一個交換機,可以有多個Routing key。
- 解決問題:起個名用於區分,方便對不同隊列進行不同的操作,就像MySQL表id作用一樣。
死信
無法被消費的消息。
死信隊列
- 極簡概括:隊列中的消息無法被消費,若沒有後續的處理,就成了死信隊列。
- 解決問題:將消費異常的數據放入死信隊列,用於存儲消費失敗或者異常時的情況,確保失敗的消息能夠得到適當的處理(重試或由開發者調試查看用)。可以簡單的理解為找個地方存失敗的消費任務。也可將計就計,利用某些特性作為延時隊列使用。
- 產生原因:
- 消息TTL過期。
- 隊列滿了。
- 消息被拒絕(basic reject 或basic nack),並且requeue為false。
有Redis List去實現消息隊列,為什麼要RabbitMQ?
- 持久化問題:RabbitMQ支持持久化,Redis雖然也支持持久化,但只要不是每次操作都持久化,那麼就有丟失數據的風險。
- 消息應答問題:消息處理成功與失敗,Redis用隊列無法記錄,任務消息只會取一個少一個,而RabbitMQ可以。
- 故障轉移問題:Redis哨兵機制、主從複製,是針對緩存高可用,做消息中間件有局限性。RabbitMQ支持消息重新入隊。如果某個消費者由於某些原因失去連接,導致消息未發送ACK確認,那麼RabbitMQ有讓消息重新排隊的機制,如果此時其它消費者可以處理,那就讓其它消費者處理。
- 支持消息優先順序:RabbitMQ支持消息優先順序,而Redis不支持。
- 廣播支持:RabbitMQ支持廣播,等指定隊列發送,而Redis不支持。
- 路由轉發:RabbitMQ通過交換機機制,支持設定不同的分發隊列規則,滿足各個場景,而Redis List需要手動實現這塊內部機制。
有Redis Sorted Set、或者過期監聽去實現延時隊列,為什麼要RabbitMQ?
RabbitMQ是推模式還是拉模式?
都有,生產者發數據到MQ是推,消費者消費消息是拉。
通信方案選擇Push還是Pull?
推或拉是兩種通信的方向選擇,跟MQ無關,但是類似MQ,順便提一下。
個人認為:
- 看業務場景,拋開業務場景談架構都是耍流氓
- 要求實時性的,就選擇推送,輪詢耗費網路資源,調用端或客戶端每次請求,服務端都得執行一次,尤其是併發量大或者響應流程任務重的場景。
- 不需要實時性的,就拉取,減低耦合,服務端就純粹的產生服務端的數據就行,客戶端或調用端誰想拉取讓它自己來。
- 看改動開銷
- 保證工程質量的前提下,那種方式開銷小,技術老大說用哪個,或者同事們習慣那一種,就用哪一種。
- 看調用端可信任性。
- 各種鑒權,驗證是一方面,數據傳輸也是一方面,對於不信任的平臺,白推送的數據,與對方直接請求獲取。還是有區別的。
RabbitMQ可以直連隊列嗎?
RabbitMQ內部不可以直連隊列,但是操作上可以直連隊列。
就算是常規(Hello World)模式,沒有聲明交換機,也會經過一個預設交換機。
不過這樣喪失了交換機靈活的路由分發功能,適用於簡單的場景。
實操
安裝
Docker安裝
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
瀏覽器訪問:http://192.168.xxx.xxx:15672
普通安裝
CentOS7的安裝RabbitMQ的教程,已經被官網刪除了,支持CentOS8,CentOS需要藉助外力。
安裝Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang
安裝RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server
開啟服務,並設置開機自啟
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
檢查狀態
systemctl status rabbitmq-server
啟動網頁端控制台
rabbitmq-plugins enable rabbitmq_management
開啟防火牆
firewall-cmd --zone=public --add-port=80/tcp --permanent
systemctl restart firewalld
新建網頁端登錄用戶,並配置角色與許可權
rabbitmqctl add_user admin 12345678
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
-p <virtual_host>:指定虛擬主機的名稱,例如/(預設虛擬主機)。
<username>:要為其設置許可權的用戶名。
<configure_permission>:配置許可權的正則表達式。允許用戶對隊列、交換機等進行配置。
<write_permission>:寫許可權的正則表達式。允許用戶發佈消息。
<read_permission>:讀許可權的正則表達式。允許用戶獲取消息。
瀏覽器訪問
http://192.168.xxx.xxx:15672
用戶名admin,密碼12345678
命令行常用命令
systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server開啟、關停、重啟、狀態查看、開機自啟
rabbitmq-plugins enable 插件名 # RabbitMQ Server安裝插件
rabbitmq-plugins list # 插件列表
rabbitmqctl version # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges # 查看交換機列表
rabbitmqctl list_queues # 查看隊列列表
rabbitmqctl list_bindings # 查看綁定列表
PHP實現RabbitMQ Client
- RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
- 官方擴展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
這個擴展官方下載地址有最後一版Windows系統的php_amqp.dll的下載地址,(用Windows是為了方便,在CentOS上還需要編譯,改完PHP代碼每次需要重新上傳,不想費事),但是我使用報錯,所以廢棄了。 - 官方推薦:composer require php-amqplib/php-amqplib
PHP操作RabbitMQ思路並不複雜,有6種工作模式,翻來覆去就是研究消息怎麼發,發到哪裡,怎麼處理消息的問題。 - Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用現成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq
消費者常駐進程偏好
while (count($channel->callbacks)) {
$channel->wait();
}
//或者
$channel->consume();
常規模式(Hello World!)
- 極簡概括:生產者生產消息給消費者。
- 解決問題:跨進程,或跨組件、跨網路通信,適用與兩個角色,但是一個PHP進程無法完成的時候用。
- 備註:用MySQL、Redis也能實現。
- 生產者端代碼:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化通道
$channel = $connection->channel();
/*
參數1:隊列名
參數2:在聲明隊列時指定是否啟用passively模式,passively模式用於檢查隊列是否存在,而不是實際創建一個新隊列。如果隊列不存在,則會返回一個通知,而不會創建新隊列。
參數3:指定隊列的持久性。在這裡,它是false,表示隊列不是持久的。如果設置為true,則隊列將在伺服器重啟時或宕機後保留下來。
參數4:指定隊列的排他性。如果設置為 true,則該隊列只能被聲明它的連接使用,一般用於臨時隊列。false表示隊列不是排它的。
參數5:指定隊列的自動刪除,如果設置為 true,則在隊列不再被使用時將自動刪除。在這裡,它是 false,表示隊列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);
//編輯消息
$msg = new AMQPMessage('Hello World!');
//發送消息,交換機用不上,所以留空。這方法沒有返回值
$channel->basic_publish($msg, '', 'hello');
//用完了就關閉,釋放資源
$channel->close();
$connection->close();
- 消費者端代碼
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化連接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化通道
$channel = $connection->channel();
/*
參數1:隊列名
參數2:在聲明隊列時指定是否啟用passively模式,passively模式用於檢查隊列是否存在,而不是實際創建一個新隊列。如果隊列不存在,則會返回一個通知,而不會創建新隊列。
參數3:指定隊列的持久性。在這裡,它是false,表示隊列不是持久的。如果設置為true,則隊列將在伺服器重啟後保留下來。
參數4:指定隊列的排他性。如果設置為 true,則該隊列只能被聲明它的連接使用,一般用於臨時隊列。false表示隊列不是排它的。
參數5:指定隊列的自動刪除,如果設置為 true,則在隊列不再被使用時將自動刪除。在這裡,它是 false,表示隊列不會自動刪除。
*/
$channel->queue_declare('hello', false, false, false, false);
/*
參數1:隊列名稱
參數2:這是消費者標簽(consumer tag),用於唯一標識消費者。在這裡,它是空字元串,表示不為消費者指定任何特定的標簽。
參數3:如果設置了無本地欄位,則伺服器將不會向發佈消息的連接發送消息。
參數4:是指定是否自動確認消息(auto-ack)。設置為true則表示消費者在接收到消息後會立即確認消息,RabbitMQ將會將消息標記為已處理並從隊列中刪除。false表示消費者會手動確認消息,即在處理消息後,通過調用 $channel->basic_ack($deliveryTag) 手動確認消息。
參數5:指定是否獨占消費者。如果設置為true,則表示只有當前連接能夠使用該消費者。在這裡,它是true,表示只有當前連接可以使用這個消費者。
參數6:如果設置了,伺服器將不會對該方法作出響應。客戶端不應等待答覆方法。如果伺服器無法完成該方法,它將引發通道或連接異常。
參數7:回調參數,拿到數據怎樣處理。
*/
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
echo $msg->body;
});
//通過死迴圈持久化當前進程,實時消費
$channel->consume();
工作隊列模式(Work Queues)
- 極簡概括:類比Redis的lPush、Rpop。但是RabbitMQ可以針對一個隊列有多個消費者,但一條消息,只能被一個消費者消費一次,不能多次消費。為了避免消費者處理數據傾斜問題(有的隊列處理任務多,有的處理的少),所以使用了輪詢的方式,挨個處理任務。
- 解決問題:耗時且不需要串列執行的任務,可以丟給隊列,例如發簡訊、郵件、大量數據導入導出。
- 測試普通用法
生產者代碼,在cli模式下,依次輸入1~10,執行10次
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
//獲取命令行參數
$msg = new AMQPMessage($argv[1]);
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
消費者1代碼,cli模式下運行,依次返回1、3、5、7、9,可見RabbitMQ不管消費節點處理的時間,只會根據消費者數量輪詢處理,哪怕其中任意幾個隊列任務重,其它隊列任務輕鬆。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, true, false, false,
function ($msg) {
echo "收到消息,內容為{$msg->getBody()}\n";
sleep(5);
echo "成功處理消息\n";
}
);
$channel->consume();
消費者2代碼,(與消費者1代碼唯一不同的,就是sleep函數的時間),cli模式下運行,依次返回2、4、6、8、10,可見RabbitMQ不管消費節點處理的時間,只會根據消費者數量輪詢處理,哪怕其中任意幾個隊列任務重,其它隊列任務輕鬆。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->basic_consume('hello', '', false, true, false, false,
function ($msg) {
echo "收到消息,內容為{$msg->getBody()}\n";
sleep(10);
echo "成功處理消息\n";
}
);
$channel->consume();
發佈訂閱模式(Pub/Sub)
- 極簡概括:生產者生產數據,所有隊列關聯的消費者都接收(類似廣播),使用交換機的扇出(Fanout)模式實現。
- 解決問題:聊天室的功能,可以由它實現。
生產者代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
/**
參數1:交換機名稱。
參數2:交換機類型,這裡是扇出。
參數3:當passive參數設置為true時,表示不會實際創建新的交換機或隊列,而是用來檢查已經存在的交換機或隊列是否已經存在。如果存在,則返回成功,如果不存在,則返回失敗。passive參數主要用於檢查交換機或隊列是否存在,而不是實際創建新的實體
參數4:交換機是否持久化,即當RabbitMQ伺服器重啟時,交換機會不會被重新創建。
參數5:當所有綁定的隊列都與交換機解綁後,是否自動刪除交換機。
*/
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
$msg = new AMQPMessage('扇出測試');
//發送給指定的交換機
$channel->basic_publish($msg, 'fanout_test');
$channel->close();
$connection->close();
消費者1代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機
$channel->queue_bind($queue_name, 'fanout_test');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
消費者2代碼(同1):
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('fanout_test', 'fanout', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機
$channel->queue_bind($queue_name, 'fanout_test');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
路由模式(Routing)
- 極簡概括:通過路由鍵,將消息發送給指定的(任意數量)消費者,一個消費者可配置多個路由鍵。
- 解決問題:只發送部分消費者。
生產端代碼,只讓消費者1消費。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('direct_test', 'direct', false, false, false);
$msg = new AMQPMessage('扇出測試');
//發送給指定的交換機,並指定路由鍵
$channel->basic_publish($msg, 'direct_test', 'consumer1');
$channel->close();
$connection->close();
消費者1代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機,並聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer1');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
消費者2代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//交換機初始化
$channel->exchange_declare('direct_test', 'direct', false, false, false);
//創建臨時隊列,用於接受隊列的消息
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];
//隊列綁定指定的交換機,並聲明路由鍵
$channel->queue_bind($queue_name, 'direct_test', 'consumer2');
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
主題模式(Topics)
- 極簡概括:生產者獲取消息,編寫指定的匹配規則,使被匹配的隊列的消費者能夠消費消息。
- 解決問題:實現更加靈活的,消息分發模式。
- 備註:主題模式下的路由鍵,多個標識用多個點將其分開(例如aaa.bbb.ccc),最長不超過255個字元。支持通配符,*匹配一個標識(奇葩設計,和#就不應該反過來嗎),#匹配任意個標識。
通配符用法*.*.xxx,表示xxx結尾的路由鍵,*.xxx.*表示包含xxx的路由鍵。a.#可以匹配a,也可以匹配,a.b。
對應的,當一個隊列綁定的鍵是#,那就類似於fanout模式,如果一個隊列中沒有任何通配符,那就類似於direct模式。
生產者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明隊列類型為主題
$channel->exchange_declare('topic_test', 'topic', false, false, false);
$msg = new AMQPMessage('topic測試數據');
/*
以下路由鍵可以接受到消息
a.b
a.*.*
a.*.*.*
#.z
z
a.x.y.z
abc.z
*/
$arr = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad','a.x.y.z', 'abc.z'];
foreach($arr as $v) {
$channel->basic_publish($msg, 'topic_logs', $v);
}
$channel->close();
$connection->close();
消費者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明topic模式的交換機
$channel->exchange_declare('topic_test', 'topic', false, false, false);
//創建臨時隊列
$queue_name = $channel->queue_declare("", false, false, true, false)[0];
$binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z'];
//綁定多個路由鍵
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
$callback = function ($msg) {
echo 'RoutingKey:', $msg->getRoutingKey(), ' --- Msg:', $msg->getBody(), "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
遠程過程調用模式(RPC)
- 極簡概括:(對於PHP,RPC用的不多,中間件RPC用的更少)使得客戶端可以像調用本地方法一樣調用遠程方法,同時隱藏了底層網路通信細節。
調用端代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RpcClient {
private $connection;
private $channel;
private $queue_name;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$this->channel = $this->connection->channel();
$this->queue_name = $this->channel->queue_declare("", false, false, true, false)[0];
$this->channel->basic_consume($this->queue_name, '', false, true, false, false, array($this, 'onResponse'));
}
public function onResponse($rep) {
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($str) {
$this->response = null;
$this->corr_id = uniqid();
$this->channel->basic_publish(new AMQPMessage($str, [
'correlation_id' => $this->corr_id,
'reply_to' => $this->queue_name
]), '', 'rpc_queue');
while (! $this->response) {
$this->channel->wait();
}
return $this->response;
}
}
$fibonacci_rpc = new RpcClient();
$response = $fibonacci_rpc->call('客戶端向服務端發送數據');
echo $response, "\n";
服務端代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
$callback = function ($req) {
echo $req->getBody(), "\n";
$msg = new AMQPMessage('服務端成功接收:'. $req->getBody(), ['correlation_id' => $req->get('correlation_id')]);
$req->getChannel()->basic_publish($msg, '', $req->get('reply_to'));
$req->ack();
};
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信隊列(TTL過期)
- 極簡概括:消息超時未處理,會被放置死信隊列中。
- 備註:註釋很清楚,這需要先執行這段代碼(創建普通與死信,交換機、隊列、和路由鍵),然後再關閉(模擬消費者掛掉,10秒後,此時超時的消息就會存入死信隊列,至於死信的隊列的數據,是重試,還是給開發者提示,看產品需求)。
- 現象:流程走完後,登錄控制台,到Queues選項卡,查看隊列列表的Ready列,原先的normal_queue隊列由原先的10變成了0(執行一次生產者代碼,裡面有一個for迴圈),而dead_queue裡面的Ready,由原先的0,變成了10(10個消息全部超時,到了死信隊列)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue = 'normal_queue';
//聲明死信隊列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//設置消息生存時間為15秒
$payload->set('x-message-ttl', 10000);
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
echo $msg->getBody(), "\n";
});
//常駐進程
$channel->consume();
$channel->close();
$connection->close();
然後再執行生產者代碼,模擬發消息。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置過期時間為10秒,讓生產者控制過期時間
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
極簡死信隊列消費示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信隊列(隊列達到最大長度放不下)
- 極簡概括:生產的消息數量超過了消費者端設置了最大長度,剩餘的消息會被放入死信隊列。
- 註意:本次測試,需要把原先的隊列刪除,否則隊列長度設置不生效。
- 現象:流程走完後,登錄控制台,到Queues選項卡,查看隊列列表的Ready列,原先的normal_queue隊列由原先的0變成了8(執行一次生產者代碼,裡面有一個for迴圈),而dead_queue裡面的Ready,由原先的0,變成了2(10 - 8)。
消費者端代碼,這一步是為了初始化普通和死信交換機、隊列、路由鍵,並且需要執行後Ctrl + C強制停止,保證生產者生產的消息,不被能正常的消費(不然怎麼演示不正常現象時的死信隊列?)。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue = 'normal_queue';
//聲明死信隊列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//設置最多存儲8條消息
$payload->set('x-max-length', 8);
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
echo $msg->getBody(), "\n";
});
//常駐進程
$channel->consume();
$channel->close();
$connection->close();
生產者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置過期時間為10秒,讓生產者控制過期時間
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
極簡死信隊列消費示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
死信隊列(消息被拒絕,且requeue為false)
- 極簡概括:消費者拒絕了消息,並且將requeue為false,的消息,會被放入死信隊列。
- 註意:本次測試,需要把原先的隊列刪除,避免影響。
- 現象:流程走完後,登錄控制台,到Queues選項卡,查看隊列列表的Ready列,原先的normal_queue隊列由原先的0變成了0(執行一次生產者代碼,裡面有一個for迴圈),而dead_queue裡面的Ready,由原先的0,變成了3(10 - 7,消費者的代碼中,有if判斷)。
這次讓消費者正常消費代碼即可,不用Ctrl + C強制中斷,正常接受生產者者的數據。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue = 'normal_queue';
//聲明死信隊列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, false, false, false, function($msg) {
if($msg->getBody() > 6) {
//手動拒絕消息,不批量,且不讓重入隊列
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, false);
} else {
//手動確認消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
});
//常駐進程
$channel->consume();
$channel->close();
$connection->close();
生產者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0; $i< 10; $i++) {
$msg = new AMQPMessage($i, [
//配置過期時間為10秒,讓生產者控制過期時間
'expiration' => '10000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
極簡死信隊列消費示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg->getBody();
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延時隊列(基於TTL過期式的死信隊列,不推薦使用)
- 極簡概括:生產出來的隊列任務,不會立馬消費,而是等到指定時間。
- 解決問題:自動確認訂單,自動取消未支付訂單,等。
- 原理分析:常規的生產和消息鏈路配置了死信隊列的兜底機制,若普通隊列因TTL過期,會自動把消息放入死信隊列,利用這個特性,可以做出來延遲隊列,延時隊列的延遲機制,就是普通隊列的TTL過期時間,延時任務的處理機制,就是消費死信隊列的代碼段。因此可以專門選一個死信隊列,作為延時隊列來用。
- 註意:
- 例如延遲10秒,並不能保證一定會在第10秒被處理,會有小誤差(絕大部分業務場景能接受)。
- 延遲消息是排隊處理的,第一個延遲10秒,第二個消息延遲5秒,預設情況下,第二個會延遲15秒,因為消息是排隊(隊列先進先出)處理的,所以這個情況需要優化,這是RabbitMQ死信隊列實現延時隊列的巨大缺陷。
擴展:利用Redis的sorted set也可以作為延時隊列,key為唯一標識符,score為執行的時間的時間戳,val為要執行的序列化代碼,用while(true)起一個常駐進程,用於消費當前時間戳下的任務,如果要添加任務,就網Zset中添加數據。常駐進程到時間了,就會消費。
初始化普通、死信交換機、隊列、路由鍵。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
//聲明普通交換機名稱
$normal_exchange = 'normal_exchange';
//聲明死信交換機名稱
$dead_exchange = 'dead_exchange';
//聲明普通隊列名稱
$normal_queue = 'normal_queue';
//聲明死信隊列名稱
$dead_queue = 'dead_queue';
//聲明普通路由鍵
$normal_routing_key = 'normal_routing_key';
//聲明死信路由鍵
$dead_routing_key = 'dead_routing_key';
//聲明普通交換
$channel->exchange_declare($normal_exchange, 'direct', false, false);
//聲明死信交換機
$channel->exchange_declare($dead_exchange, 'direct', false, false);
//配置普通隊列異常時,轉發給死信隊列的荷載參數,就指望著這個普通隊列有問題了,才會把消息數據轉發到死信隊列,轉發到那裡,肯定是要配置的。
$payload = new AMQPTable();
//定位普通隊列出異常了,要轉發的交換機
$payload->set('x-dead-letter-exchange', $dead_exchange);
//定位了要轉發的交換機還不夠,還得知道那個隊列,不然交換機不知道路由那個消息到達那個隊列
$payload->set('x-dead-letter-routing-key', $dead_routing_key);
//聲明普通隊,就是等普通隊列出問題了,才把數據丟給死信隊列,所以普通(註意是普通隊列)隊列,要額外的配置。
$channel->queue_declare($normal_queue, false, false, false, false, false, $payload);
//聲明死信隊列,其實死信隊列本身是一個普通隊列。
$channel->queue_declare($dead_queue, false, false, false, false);
echo '正在等待接受消息...';
//綁定普通交換機與普通隊列
$channel->queue_bind($normal_queue, $normal_exchange, $normal_routing_key);
//綁定死信交換機與死信隊列
$channel->queue_bind($dead_queue, $dead_exchange, $dead_routing_key);
$channel->basic_consume($normal_queue, '', false, true, false, false, function($msg) {
});
生產者代碼,產生延時任務
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->exchange_declare('normal_exchange', 'direct', false, false, true);
for($i = 0;$i < 10; $i++) {
$msg = new AMQPMessage(microtime(true), [
'expiration' => '3000'
]);
$channel->basic_publish($msg, 'normal_exchange', 'normal_routing_key');
}
$channel->close();
$connection->close();
延時任務處理代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
$consumer_time = microtime(true);
$product_time = $msg->getBody();
echo '時間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('dead_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延時隊列(基於RabbitMQ延遲交換機插件,推薦使用)
- 官方插件集合地址:https://www.rabbitmq.com/community-plugins
- rabbitmq-delayed-message-exchange版本地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 版本匹配:
rabbitmqctl version
發現是3.10.0,rabbitmq-delayed-message-exchange release頁也有說明:This release has no functional changes but lists RabbitMQ 3.10.x as supported in plugin metadata. - 實測延時誤差在5毫秒以內,滿足99.9%的應用場景。
- 安裝插件
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
發現如下字樣,就說明安裝成功。
Enabling plugins on node rabbit@lnmp:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@lnmp...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
systemctl restart rabbitmq-server
生產者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$delay_exchange = 'delay_exchange';
$delay_queue = 'delay_queue';
$delay_routing_key = 'delay_routing_key';
//聲明交換機類型為direct
$payload = new AMQPTable();
$payload->set('x-delayed-type', 'direct');
//聲明延時隊列交換機,參數2的類型,是安裝插件後才有的,固定值
$channel->exchange_declare($delay_exchange, 'x-delayed-message', false, false, true, false, false, $payload);
//聲明一個自定義延遲隊列
$channel->queue_declare($delay_queue, false, false, false, false);
//隊列綁定交換機
$channel->queue_bind($delay_queue, $delay_exchange, $delay_routing_key);
//發送延遲消息
$msg = new AMQPMessage(microtime(true), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
//這裡配置超時時間,固定格式。
'application_headers' => new AMQPTable(['x-delay' => 5000])
]);
$channel->basic_publish($msg, $delay_exchange, $delay_routing_key);
$channel->close();
$connection->close();
消費者代碼
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
$consumer_time = microtime(true);
$product_time = $msg->getBody();
echo '時間處理誤差:', bcsub($consumer_time, $product_time, 4), "\n";
};
$channel->basic_consume('delay_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
延時隊列提前執行或延後執行的方案
某些業務場景,可能需要提前觸發,或者延期處理,這就需要一些外的操作,才能完成。
隊列優先順序
- 不同的隊列和可以設置不同的優先順序。
- 在堆積的情況下才生效,否則生產一個消息處理一個,無阻塞,就不存在多個消息的優先處理的問題。
- 要設置優先順序,那麼隊列和消息都需要設置優先順序。
- 優先順序越大,越先被處理。
- 支持1到255之間的優先順序,強烈建議使用1到5之間的值,更高的優先順序值需要更多的CPU和記憶體資源,因為RabbitMQ需要在內部為每個優先順序維護一個子隊列,從1到為給定隊列配置的最大值。
首先啟動生產者代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$test_exchange = 'test_exchange';
$test_queue = 'test_queue';
$test_routing_key = 'test_routing_key';
$channel->exchange_declare($test_exchange, 'direct', false, false, true, false, false);
$payload = new AMQPTable();
$payload->set('x-max-priority', 20);
//聲明一個自定義延遲隊列
$channel->queue_declare($test_queue, false, false, false, false, false,$payload);
//隊列綁定交換機
$channel->queue_bind($test_queue, $test_exchange, $test_routing_key);
for($i =0; $i < 10; $i++) {
$msg = new AMQPMessage($i, [
//為每個消息設置不同的優先順序
'priority' => $i,
]);
$channel->basic_publish($msg, $test_exchange, $test_routing_key, true);
}
$channel->close();
$connection->close();
消費者代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$callback = function ($msg) {
echo $msg . "\n";
};
$channel->basic_consume('test_queue', '', false, true, false, false, $callback);
$channel->consume();
$channel->close();
$connection->close();
惰性隊列
惰性隊列用的極少,因為是存儲在磁碟中的。在消費者掛掉的情況下,避免RabbitMQ積累了太多的消息,消耗記憶體去採取的策略。
高可用
性能與高可用權衡的架構思維
加了RabbitMQ組件可以看做是分散式系統,分散式系統有有一個CAP定理,CAP只能同時滿足兩個。
對於RabbitMQ消息丟失,或者重覆消費的問題,若當前需求不能容忍,就需要額外的環節來彌補。如果當前需求(小概率)能容忍,去掉高可用環節,那麼在性能上就會提升。
例如支付寶,支付時總是會延遲幾秒鐘,一是走支付寶安全風控系統,二是要極致要求穩定高可用的環節必然性能下降,三是等支付回調,然而其它介面內容卻是瞬間載入。
所以沒有最好的架構,只有最合適的架構,所以要結合業務場景,判斷業務容忍度下限,與開發複雜度,在高可用與性能之間權衡,這是架構師必備思維。
如何避免RabbitMQ消息丟失?
- 百分99.9%投遞策略:事先要處理的消息綁一個唯一標識(讓RabbitMQ和MySQL自始至終都能知道是哪一個,避免因業務需求生產兩份同樣的消息區分不出來),先寫入MySQL,並預定一個未消費的預設狀態,直到收到RabbitMQ的確認通知,改變狀態。再加一個迴圈定時任務(定時的周期看業務需求容忍度與組件性能),超時未處理的消息,讓定時任務手動觸發。
但這種也不是沒有弊端,如果RabbitMQ的確認通知由於網路抖動沒有發出去,那麼定時任務就會讓其重覆消費。 - 生產消息傳輸感知:RabbitMQ掛了,生產消息發送給MQ時,就會超時,報錯,此時把消息放入MySQL做兜底(可以臨時不處理,但是別丟),避免消息丟失。
- RabbitMQ自帶的交換機、隊列、消息持久化機制。
- RabbitMQ自帶的發送確認、消費確認、事務隊列機制。
如何防止RabbitMQ重覆消費?
- 兜底策略:在數據集層上最好有個兜底策略,如唯一索引,更新數據前的狀態機判斷等,避免由於程式設計疏忽或者故障等原因導致的重覆消費。
- 上游生產防重:消息重覆消費,一方面是消息重覆發送引起的,一般在介面處理的上游,會有一些防重策略(數據冪等),上游杜絕,也是一種策略。
- 消費端上游判斷:首先把要處理的消息綁一個唯一標識(讓RabbitMQ和MySQL自始至終都能知道是哪一個,避免因業務需求生產兩份同樣的消息區分不出來),消費者獲取到消息時,再把唯一標識寫入具有唯一索引約束的MySQL表,重覆消費,MySQL唯一索引就要報錯,利用插入失敗的特性,阻止重覆消費。
RabbitMQ不公平分發
- 不公平分發:推薦操作,避免一個消費者很忙,其它消費者不做任何工作的情況發生,防止壓力傾斜。
可在每個消費者端調用basic_consume()方法之前配置,註意這種行為在消息堆積的情況下生效。
被設置不公平分發的消費者中,可從網頁端控制台->Channels選項卡->表格->所在行數據的Prefetch中看到。
/*
參數1:預取大小,通常情況下為null,表示不限制消息大小。
參數2:預取數量,表示消費者每次最多接收的消息數量,值為權重比例,可以為任意正整數。
參數3:是否將預取限制應用到channel級別(true)或者消費者級別(false)。
*/
$channel->basic_qos(null, 1, false);
RabbitMQ可靠消息機制(發佈確認)
-
極簡概括:是用來保證消息不丟失的的重要功能,生產者將消息發送給MQ,MQ把數據保存在磁碟上之後,會返回生產者成功保存的反饋。需要生產者端的隊列以及消息持久化的前提,就是為了防止隊列或者小寫將要持久化的時候RabbitMQ出故障的間隙情況發生。
隊列持久化、消息持久化、發佈確認3個因素加起來,才能保證消息不丟失。
發佈確認模式有3種:- 單個確認:性能最低,生產一條消息確認一個,後面的消息排隊。
- 批量確認:性能比單個確認好,但是出問題時,無法定位是那條消息。
- 非同步確認:是性能與高可用性權衡的方案(很多服務端組件都有這樣的),利用回調函數來達到消息可靠性傳遞的。
-
單個確認,發送100條數據,耗時0.337秒,大部分場景夠用,高併發場景除外。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();
$channel->queue_declare('hello', false, true, false, false);
//開啟發佈確認模式
$channel->confirm_select();
$start = microtime(true);
for($i = 0; $i < 100; $i++) {
$msg = new AMQPMessage($i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', 'hello');
//生產一條數據,發送一條確認消息,參數值為超時時間,單位:秒
$channel->wait_for_pending_acks(10.000);
}
echo microtime(true) - $start; // 0.337秒
$cha