說到隊列的話,大家一定不會陌生,但是扯到優先順序隊列的話,還是有一部分同學是不清楚的,可能是不知道怎麼去實現吧,其實呢,,,這東西已 經爛大街了。。。很簡單,用“堆”去實現的,在我們系統中有一個訂單催付的場景,我們客戶的客戶在tmall,taobao下的訂單,taobao會及時將訂單推送給 我們,如果 ...
說到隊列的話,大家一定不會陌生,但是扯到優先順序隊列的話,還是有一部分同學是不清楚的,可能是不知道怎麼去實現吧,其實呢,,,這東西已
經爛大街了。。。很簡單,用“堆”去實現的,在我們系統中有一個訂單催付的場景,我們客戶的客戶在tmall,taobao下的訂單,taobao會及時將訂單推送給
我們,如果在用戶設定的時間內未付款那麼就會給用戶推送一條簡訊提醒,很簡單的一個功能對吧,但是,tmall商家對我們來說,肯定是要分大客戶和小客
戶的對吧,比如像施華蔻,百雀林這樣大商家一年起碼能夠給我們貢獻幾百萬,所以理應當然,他們的訂單必須得到優先處理,而曾今我們的後端系統是使
用redis來存放的定時輪詢,大家都知道redis只能用List做一個簡簡單單的消息隊列,並不能實現一個優先順序的場景,所以訂單量大了後採用rabbitmq進行
改造和優化,如果發現是大客戶的訂單給一個相對比較高的優先順序,否則就是預設優先順序,好了,廢話不多說,我們來看看如何去設置。
一:優先順序隊列
既然是優先順序隊列,那麼必然要在Queue上開一個口子貼上一個優先順序的標簽,為了看怎麼設置,我們用一下rabbitmq的監控UI,看看這個裡面是如何
手工的創建優先順序隊列。
從這個圖中可以看到在Arguments欄中有特別多的小屬性,其中有一項就是"Maximum priority",這項的意思就是說可以定義優先順序的最大值,其實
想想也是,不可能我們定義的優先順序是一個非常大的數字,比如int.MaxValue,大多情況下都是10以內的數字就可以了,再或者我們曾今接觸過的 MSMQ,
它的優先順序只是一些枚舉值,什麼High,Normal,Low,不知道大家可否記得? 下麵來看下代碼中該如何實現呢???
1. 在Queue上附加優先順序屬性
Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-max-priority", 20); channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: dic);
上面的代碼做了一個簡單的隊列聲明,queuename="hello",持久化,排外。。。然後把"x-max-priority"塞入到字典中作為arguments參數,看起來還
是非常簡單吧~~~
2. 在Message上指定優先順序屬性
var properties = channel.CreateBasicProperties(); properties.Priority = 1; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
通過上面的代碼可以看到,在Message上設置優先順序,我是通過在channel通道上設置Priority屬性,之後塞到basicProperties中就可以了,好了,有上面這兩
個基礎之後,下麵就可以開始測試了,準備向rabbitmq推送10條記錄,其中第5條的優先順序最高,所以應該首先就print出來,如下圖:
static void Main(string[] args) { var sb = new StringBuilder(); for (int i = 0; i < 11; i++) { sb.Append(i); } var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "mydirect", type: ExchangeType.Direct, durable: true); Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-max-priority", 20); for (int i = 0; i < 10; i++) { channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: dic); string message = string.Format("{0} {1}", i, sb.ToString()); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Priority = (i == 5) ? (byte)10 : (byte)i; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", i); } } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
圖中可以看到10條消息我都送到rabbitmq中去了,接下來打開consume端,來看看所謂的index=5 是否第一個送出來??
static void Main(string[] args) { for (int m = 0; m < int.MaxValue; m++) { var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var result = channel.BasicGet("hello", true); if (result != null) { var str = Encoding.UTF8.GetString(result.Body); Console.WriteLine("{0} 消息內容 {1}", m, str); System.Threading.Thread.Sleep(1); } } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
一切都是這麼的完美,接下來為了進行可視化驗證,你可以在WebUI中觀察觀察,可以發現在Queue上面多了一個 Pri 標記,有意思吧。
好了,這麼重要的功能,是不是已經讓你足夠興奮啦, 希望大家能夠好好的在實際場景中運用吧~~~