回到目錄 很久就想寫一套屬於自己的消息隊列組件,前段時候看了湯雪華同學的EQueue,感覺還是不錯的,他也是看了rabbitMQ之後寫的Equeue,在設計上與前者有類似的地方,而大叔這次準備寫一個LindQueue,當前整體架構都差不多,無非是生產者,管道,消費者三個角色,而核心部分就是管道Bro ...
很久就想寫一套屬於自己的消息隊列組件,前段時候看了湯雪華同學的EQueue,感覺還是不錯的,他也是看了rabbitMQ之後寫的Equeue,在設計上與前者有類似的地方,而大叔這次準備寫一個LindMQ,當前整體架構都差不多,無非是生產者,管道,消費者三個角色,而核心部分就是管道Broker這個東西了,為生產者提供了push操作;而為消費者又提供了Pull操作;為瞭解耦考慮,他們之前沒有直接的引用關係,通訊採用tcp的方式,Broken的消息存儲介質我們使用redis,生產者和消費者與Broker的通訊我們採用FastSocket這個組件。
LindMQ設計架構圖
關於MQ系統的三大對象
以下三大對象其實都有各自的實現,各自的平臺,各自也都需要一個宿主!
Producer
生產者,用來將消息從源平臺發送到目標隊列中,目標隊列用到存儲消息,這裡一般指Broker,它可以是一種集群環境,它會封裝對存儲消息介質的入隊與出隊的基本操作,而對於真實的存儲介質,生產者是不需要知道的!
Broker
消息處理者,用來處理消息,加工消息,存儲消息等,它會公開基本的推消息介面和拉消息介面!
Consumer
消費者,用來處理Broker里的消息,它一般通過長連接,定時向Broker裡拉消息的方法實現,基於實時性考慮,又出現了pub/sub這種發佈與訂閱模式,當消費方訂閱了某種消息主題(topic)之後,有這種消息產生時,broker會把消息自動推到消息方!
關於消息的上下文
消息上下文,我們可以把它看成是承載消息的對象,它會有topic主題,queueId消息隊列索引,queueOffset內容索引,body消息體組成,它相關於是producer,broker和consumer之間定義的一種數據協議,他們之間通訊使用這種公開的協議,在LinqQueue裡面消息協議我們稱為LindMQ,下麵看一下協議的內容
/// <summary> /// 消息協議 /// </summary> [Serializable] public class LindMQ { /// <summary> /// 消息所屬Topic,每種Topic有一種類型的Body /// </summary> public string Topic { get; set; } /// <summary> /// 消息內容,Redis里存儲為Json /// </summary> public string Body { get; set; } /// <summary> /// 消息所屬的隊列ID /// </summary> public int QueueId { get; internal set; } /// <summary> /// 消息在所屬隊列的序號 /// </summary> public long QueueOffset { get; internal set; } /// <summary> /// 消息的存儲時間 /// </summary> internal DateTime CreateTime { get; set; } /// <summary> /// 將消息對象序列化成字元 /// </summary> /// <returns></returns> public override string ToString() { return Utils.SerializeMemoryHelper.SerializeToJson<LindMQ>(this); } }
通過上面代碼我們可以看到,Topic,QueueId,QueueOffset,Body等欄位,這也是一個消息協議也需要的主要信息了,Topic是消息主題,我們可以這樣認為,一種主題就是一類消息,QueueId它位於Topic消息主題下麵,一個topic可以包括多個queue,而QueueOffset是每個queue里的消息索引,類似你的消息在消息隊列里排在第幾位;而body就是我們的消息體了,它使用二時制流表示,這樣有利於網路傳輸!
好了,今天主要是對LinqQueue做一個簡單的介紹,下次我再繼續介紹LindQueue!