回到目錄 redis的客戶端有很多,這次用它的pub/sub發佈與訂閱我選擇了StackExchange.Redis,發佈與訂閱大家應該很清楚了,首先一個訂閱者,訂閱一個服務,服務執行一些處理程式(可能是寫個日誌,插入個數據,發個email)然後當另一個項目的某個業務發佈這個服務後,被訂閱的程式將會 ...
redis的客戶端有很多,這次用它的pub/sub發佈與訂閱我選擇了StackExchange.Redis,發佈與訂閱大家應該很清楚了,首先一個訂閱者,訂閱一個服務,服務執行一些處理程式(可能是寫個日誌,插入個數據,發個email)然後當另一個項目的某個業務發佈這個服務後,被訂閱的程式將會被執行,這個聽起來很有意思,redis有好的實現了這個pub/sub功能。
看一下結構圖
Sub訂閱(消息消費者)
對於訂閱方,這裡類似於一個服務,它會長期運行著,被啟動後動態訂閱一些服務進來,以便以後再被其它服務調用
//sub a function in A-project PubSubManager.Instance.Subscribe("UserLog", (msg) => { //訂閱者處理自己的業務邏輯,這相關於隊列服務要乾的事 Console.WriteLine(msg); });
Pub發佈(消息生產者)
而對於其它項目,如A網站,它可能在被在POST動作後發佈這個UserLog的服務,這時上面的訂閱方將會消費它(消費者模式),或者服務代碼被執行!
[HttpPost] public ActionResult Index(string user) { PubSubManager.Instance.Publish("UserLog", user + "這個用戶提交表單了"); return View(); }
對於一直運行的服務,將會收到來自不同項目的消息,而它只負責消費它!
Lind.DDD.PublishSubscribe
封裝了一些標準的pub/sub方法,它可以有多種實現方法,本例使用redis這個中間件
/// <summary> /// 發佈訂閱的介面規則 /// </summary> public interface IPubSub { /// <summary> /// 發佈,有順序,對象源是字元串 /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void Publish(string channel, string value); /// <summary> /// 訂閱,對象源是字元串 /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void Subscribe(string channel, Action<string> action); /// <summary> /// 非同步發佈,無順序,對象源是字元串 /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishAsync(string channel, string value); /// <summary> /// 非同步訂閱,無順序,對象源是字元串 /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeAsync(string channel, Action<string> action); /// <summary> /// 發佈,有順序,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishByte(string channel, byte[] value); /// <summary> /// 訂閱,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeByte(string channel, Action<byte[]> action); /// <summary> /// 非同步發佈,有順序,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishByteAsync(string channel, byte[] value); /// <summary> /// 非同步訂閱,對象源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeByteAsync(string channel, Action<byte[]> action); /// <summary> /// 發佈,有順序,對象源是泛型對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="value"></param> void Publish<T>(string channel, T value); /// <summary> /// 訂閱,對象源是泛型對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="action"></param> void Subscribe<T>(string channel, Action<T> action); /// <summary> /// 非同步發佈,有順序,對象源是泛型對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="value"></param> void PublishAsync<T>(string channel, T value); /// <summary> /// 非同步訂閱,對象源是泛型對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeAsync<T>(string channel, Action<T> action); /// <summary> /// 取消指定訂閱 /// </summary> /// <param name="channel"></param> void UnSubscribe(string channel); /// <summary> /// 取消所有訂閱 /// </summary> /// <param name="channel"></param> void UnSubscribeAll(); }