MessageQueen,顧名思義消息隊列,在系統開發中也是用的比較多的一個中間件吧。我們這裡主要用它來做日誌管理和訂單管理的,記得老老大(恩,是的,就是老老大,因為他已經跳槽了)還在的時候,當時也是為了趕項目進度,他也參與開發了,那時候我才剛剛入職,他負責寫後端這塊,我來了就把他手上的任務接過來了 ...
MessageQueen,顧名思義消息隊列,在系統開發中也是用的比較多的一個中間件吧。我們這裡主要用它來做日誌管理和訂單管理的,記得老老大(恩,是的,就是老老大,因為他已經跳槽了)還在的時候,當時也是為了趕項目進度,他也參與開發了,那時候我才剛剛入職,他負責寫後端這塊,我來了就把他手上的任務接過來了,(接著接著……就辭職了)。
之後我們的開發仍然有條不紊的開發著,直到今年的一月份吧,才上線開始運行,然後就出現了常規狀態,上線之後就開始爆炸,
這個頁面打不開呀,那個內容沒東西呀,第三方登錄問題呀,支付問題呀,臨時再改需求呀……(該來的都來了),加班、debug、測試、再debug……,然後經過幾天的修複,終於完成了跟自己電腦一樣穩定的運行,組員們都美滋滋的,今晚加個雞腿才行。
都說禍不單行,古人是不會騙我們的,Bug怎麼會修得完呢?天真,要是Bug能修得完還要我們來幹啥,好景不長,果然,過了一周之後,組員突然群里叫喳喳,
what is it ?
來了,今天的主角登場了,我也要開始加班了。
RabbitMQ
這個是今天要說的東西,基礎概念什麼的不是今天要說的重點,重點是:
RabbitMQ記憶體暴漲!使得整個伺服器瀕臨癱瘓,遠程登錄伺服器都差點擠不進去的狀態,別看截圖目前才1.3G,吃個午飯回來,就2.3G了,可怕不可怕?咋回事?
老闆喊你回來加班啦
先不管了,線上優先解決,手動先Reset回收資源以釋放空間,這個只是臨時的辦法,然後檢查一下rabbitMQ的配置有沒有問題,路徑在
C:\Users\Administrator\AppData\Roaming\RabbitMQ完全是預設的配置,完全ojbk啊,那到底咋回事?繼續檢查,想想不如從項目開始吧,然後查看項目中的代碼,都是從來自【MessageLib】的組件調用
好了,叫我老老大要這個組件的代碼,他把git的地址就發給我,我把項目down下來,
這個封裝的組件內容不多,主要的文件一目瞭然,其實就是用到這個兩個組件來進行的二次封裝來調用
主要的代碼是在【MessageQueue.cs】文件里,展示一下當時的代碼情況:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using MessageLib.ClassBean; using EasyNetQ; using System.Threading; namespace MessageLib { public static class MessageQueue { public static IBus bus = MQBusBuilder.CreateMessageBus(); //消息隊列 private static Queue<Item> NoticQueue = new Queue<Item>(5000); //日誌隊列 private static Queue<Item> LogQueue = new Queue<Item>(5000); //隊列數目發佈數量 private static int max_count_to_pulish = 1000; /// <summary> /// 可供外部使用的消息入列操作 /// </summary> public static void push(Item item) { if (item.type == ItemType.notic) { NoticQueue.Enqueue(item); } if (item.type == ItemType.log) { LogQueue.Enqueue(item); } } /// <summary> /// 監聽後需要調用的發佈介面 /// </summary> private static void Pulish(object source, System.Timers.ElapsedEventArgs e) { if (NoticQueue.Count > 0 || LogQueue.Count > 0) { if (bus == null || !bus.IsConnected) { bus = MQBusBuilder.CreateMessageBus(); } if (bus.IsConnected) { Send(ItemType.notic); Send(ItemType.log); } } } /// <summary> /// 程式自運行並開始監聽 /// </summary> public static void Run() { System.Timers.Timer timer = new System.Timers.Timer(); timer.Interval = 1000; timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到達時間的時候執行事件; timer.AutoReset = true;//設置是執行一次(false)還是一直執行(true); timer.Enabled = true;//是否執行System.Timers.Timer.Elapsed事件; } /// <summary> /// 啟動線程非同步調用 /// </summary> /// <param name="channelType"></param> private static void Send(string channelType) { Thread thread = new Thread(new ParameterizedThreadStart(PublishAction)); thread.IsBackground = true; thread.Start(channelType); } /// <summary> /// 調用發佈日誌及提醒兩個介面 /// </summary> /// <param name="channel"></param> private static void PublishAction(object channel) { PublisLog(); PublisNotic(); } /// <summary> /// 日誌消息發送至RabbitMQ指定exchange、Queue /// </summary> private static void PublisLog() { string channelName = ItemType.log; try { var routingKey = channelName; var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName)); var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct"); var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey); while (LogQueue.Count > 0) { Item item = LogQueue.Dequeue(); if (item != null) { var properties = new MessageProperties(); var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item)); Message.Properties.AppId = item.appid; bus.Advanced.Publish(exchange, routingKey, false, Message); } } } catch (Exception ex) { throw ex; } } /// <summary> /// 提醒消息發送至RabbitMQ指定exchange、Queue /// </summary> private static void PublisNotic() { string channelName = ItemType.notic; var routingKey = channelName; var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName)); var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct"); var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey); while(NoticQueue.Count > 0) { Item item = NoticQueue.Dequeue(); if (item != null) { var properties = new MessageProperties(); var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item)); Message.Properties.AppId = item.appid; bus.Advanced.Publish(exchange, routingKey, false, Message); } } } } }View Code
然後我就發現了這一段代碼!
/// <summary> /// 程式自運行並開始監聽 /// </summary> public static void Run() { System.Timers.Timer timer = new System.Timers.Timer(); timer.Interval = 1000; timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到達時間的時候執行事件; timer.AutoReset = true;//設置是執行一次(false)還是一直執行(true); timer.Enabled = true;//是否執行System.Timers.Timer.Elapsed事件; }
/// <summary> /// 啟動線程非同步調用 /// </summary> /// <param name="channelType"></param> private static void Send(string channelType) { Thread thread = new Thread(new ParameterizedThreadStart(PublishAction)); thread.IsBackground = true; thread.Start(channelType); }
老老大寫Bug了,當Run()起來之後,隊列中【NoticQueue】有內容,就開始推送消息,發送消息Send(),每來一次推送new一個線程並設置為後臺線程,然後發送消息。好了,明白了,這裡的線程很混亂,因為線程操作不當,new了N多個頻道,並且沒有主動回收,這也難怪記憶體暴漲呢。並且要是Run()調用多次,後果更加不堪設想。
加班改起來
開始動手吧,業務主要推送有普通消息、錯誤消息和通知消息,那麼將隊列與線程組裝一起,新增一個類QueueTask.cs:
public class QueueTask { private Queue<Item> NoticQueue = new Queue<Item>(5000); //隊列數目發佈數量 private int max_count_to_pulish = 1000; public bool isRunning = false; private string itemType = ItemType.info; private string MessageRouter = ItemType.info; public QueueTask(string itemType,string MessageRouter) { this.itemType = itemType; this.MessageRouter = MessageRouter; } /// <summary> /// 可供外部使用的消息入列操作 /// </summary> public void Push(Item item, IBus IBus) { NoticQueue.Enqueue(item); if (!isRunning) Run(IBus); } public void Run(IBus IBus) { if (!isRunning) { Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000); isRunning = true; } } private void PulishMsg(object state) { IBus IBus = state as IBus; if (NoticQueue.Count > 0) { PublisMsg(itemType, IBus); } } private void PublisMsg(object channel, IBus BusInstance) { try { string channelName = channel as string; if (NoticQueue.Count > 0) { var routingKey = MessageRouter; var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName)); var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct"); var binding = BusInstance.Advanced.Bind(exchange, mqqueue, routingKey); while (NoticQueue.Count > 0) { Item item = NoticQueue.Dequeue(); if (item != null) { var properties = new MessageProperties(); var Message = new EasyNetQ.Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item)); Message.Properties.AppId = item.appid; BusInstance.Advanced.Publish(exchange, routingKey, false, Message); } } } } catch (Exception ex) { Console.WriteLine("PublisMsg error:" + ex.Message); } } public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item { try { string channelName = itemType; var routingKey = MessageRouter; var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName)); var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct"); var binding = BusInstance.Advanced.Bind(exchange, mqqueue, routingKey); var Consume = BusInstance.Advanced.Consume(mqqueue, registration => { registration.Add<string>((message, info) => { Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body); dealAction(data); }); }); } catch (Exception ex) { Console.WriteLine("Read error:" + ex.Message); } } }
然後,在MessageQueue.cs修改為單例模式:
public static class MessageQueue { /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/ private static IBus bus = null; public static bool isRunning = false; //消息隊列 private static QueueTask NoticQueue = null; //日誌隊列 private static QueueTask LogQueue = null; //自定義 private static QueueTask InfoQueue = null; #region 同步鎖 private static readonly object obj = new object(); #endregion public static void Init(string Connection, string routeKey) { if (NoticQueue == null) NoticQueue = new QueueTask(ItemType.notic, ItemType.notic); if (LogQueue == null) LogQueue = new QueueTask(ItemType.error, ItemType.error); if (InfoQueue == null) InfoQueue = new QueueTask(ItemType.info, routeKey); if (string.IsNullOrEmpty(MQBusBuilder.Connnection)) MQBusBuilder.Connnection = Connection; } public static IBus BusInstance { get { if (bus == null) { lock (obj) { if (bus == null|| !bus.IsConnected) { bus = MQBusBuilder.CreateMessageBus(); } } } return bus; } } /// <summary> /// 可供外部使用的消息入列操作 /// </summary> public static void PushAndRun(Item item) { if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null) return; if (item.type == ItemType.notic) { NoticQueue.Push(item, BusInstance); } if (item.type == ItemType.error) { LogQueue.Push(item, BusInstance); } if (item.type == ItemType.info) { InfoQueue.Push(item, BusInstance); } } public static void Read(string itemType, Action<Item> dealAction) { if (itemType == ItemType.notic) { NoticQueue.Read<NoticItem>(BusInstance, dealAction); } if (itemType == ItemType.error) { LogQueue.Read<ErrorItem>(BusInstance, dealAction); } if (itemType == ItemType.info) { InfoQueue.Read<Message>(BusInstance, dealAction); } } }View Code
每次推送消息的時候,每個QueueTask就自己維護自己的線程和隊列了,當調用推送之後,就開始運作起來。恩,應該沒問題了。然後就發佈nuget,再更新項目,然後發佈。觀察一段時間,恩,完美。
事件二
事情過後,B端開始搞起來了,然後涉及到訂單系統,跟老大(不是老老大,老老大那時候已經跑了)商量之後確定使用消息隊列來做訂單的事件的拓展,然後就直接美滋滋的調用好之前寫的了,沒想到啊,這次是線程暴漲!因為訂單是從B端推送過來的,B端肯定沒事,訂單後臺訂閱消息之後,讀取過程中出現的線程增多,然後看看之前寫的Read()方法,感覺沒啥問題啊,每運行完一次,就多了一個線程,這個神奇了啊,那麼源代碼擼起來。
https://github.com/EasyNetQ/EasyNetQ
翻來覆去,看到這個Consume方法,繼承的是IDisposable介面,得勒,知道咋回事了。
Consume.Dispose(); 用完請記得主動釋放啊。
這回真的可以浪了。
總結
遇到問題,冷靜下來,耐得了寂寞才行。線上的問題優先解決,然後再慢慢Debug,解決不了,看源碼,再解決不了,降級處理,歡迎共同探討。同時也感謝一下技術群里的兄弟給的一些建議,並幫忙查找資料,還好EasyNetQ是開源了,不然也打算說先不用了,畢竟一開始沒什麼用戶量,所以沒必要整那麼麻煩,加班加點的弄這個問題。不過最終都完美的解決了,心裡還是挺美滋滋的,程式猿隨之而來的成就感。
別看我們在工位上默不作聲,我們可能在拯救世界呢!老闆,該加工資啦!