模塊(類)之間解耦利器:EventPublishSubscribeUtils 事件發佈訂閱工具類

来源:https://www.cnblogs.com/zuowj/archive/2020/05/07/12810016.html
-Advertisement-
Play Games

如果熟悉C 語言的小伙伴們一般都會知道委托、事件的好處,只需在某個類中提前定義好公開的委托或事件(委托的特殊表現形式)變數,然後在其它類中就可以很隨意的訂閱該委托或事件,當委托或事件被觸發執行時,會自動通知所有的訂閱者進行消費處理。(觀察者模式用委托來實現是最好不過了,DDD所提倡的事件驅動其根本理 ...


如果熟悉C#語言的小伙伴們一般都會知道委托、事件的好處,只需在某個類中提前定義好公開的委托或事件(委托的特殊表現形式)變數,然後在其它類中就可以很隨意的訂閱該委托或事件,當委托或事件被觸發執行時,會自動通知所有的訂閱者進行消費處理。(觀察者模式用委托來實現是最好不過了,DDD所提倡的事件驅動其根本理念也是如此),當然我這裡想到的是不需要在每個類中進行定義委托或事件,而是由一個統一的中介者(即EventPublishSubscribeUtils)來提供事件的訂閱及發佈操作,這樣各模塊之間無需直接依賴,只需通過中介者完成發佈通知與訂閱回調即可,何樂而不為呢?

這裡我先藉助C#語言獨有的委托類型快速實現了一個簡易的EventPublishSubscribeUtils,代碼如下:

    /// <summary>
    /// 自定義事件發佈訂閱回調工具類(業務解藕、關註點分離,避免互相依賴)--演示版
    /// EventBus簡化版,觀察者模式
    /// author:zuowenjun
    /// </summary>
    public static class EventPublishSubscribeUtils
    {
        private static ConcurrentDictionary<Type, EventHandler<object>> EventHandlers { get; } = new ConcurrentDictionary<Type, EventHandler<object>>();

        private static void removeRegisters(ref EventHandler<object> srcEvents, EventHandler<object> removeTargetEvents)
        {
            var evtTypes = removeTargetEvents.GetInvocationList().Select(d => d.GetType());
            var registeredEventHandlers = Delegate.Combine(srcEvents.GetInvocationList().Where(ei => evtTypes.Contains(ei.GetType())).ToArray());
            srcEvents -= (EventHandler<object>)registeredEventHandlers;
        }

        public static void Register<T>(EventHandler<object> eventHandlers)
        {
            EventHandlers.AddOrUpdate(typeof(T), eventHandlers,
                (t, e) =>
                {
                    //先根據訂閱委托類型匹匹配過濾掉之前已有的相同訂閱,然後再重新訂閱,防止重覆訂閱,多次執行的情況。
                    removeRegisters(ref e, eventHandlers);
                    e += eventHandlers;
                    return e;
                });
        }


        public static void UnRegister<T>(EventHandler<object> eventHandlers = null)
        {
            Type eventMsgType = typeof(T);
            if (eventHandlers == null)
            {
                EventHandlers.TryRemove(eventMsgType, out eventHandlers);
                return;
            }

            var e = EventHandlers[eventMsgType];
            removeRegisters(ref e, eventHandlers);
        }

        public static void PublishEvent<T>(T eventMsg, object sender)
        {
            Type eventMsgType = eventMsg.GetType();
            if (EventHandlers.ContainsKey(eventMsgType))
            {
                EventHandlers[eventMsgType].Invoke(sender, eventMsg);
            }
        }
    }

然後使用就比較簡單了,我們只需通過EventPublishSubscribeUtils.Register註冊訂閱事件消息,通過EventPublishSubscribeUtils.PublishEvent發佈事件通知,這樣就可以讓兩個甚至多個不相關的模塊(類)能夠通過消息類型實現1對多的通訊與協同處理。使用示例代碼如下:


    class EventMessage
    {
        public string Name { get; set; }

        public string Msg { get; set; }

        public DateTime CreatedDate { get; set; }
    }

    class DemoA
    {
        public DemoA()
        {
            EventHandler<object> eventHandlers = EventCallback1;
            eventHandlers += EventCallback2;

            EventPublishSubscribeUtils.Register<EventMessage>(eventHandlers);
        }

        private void EventCallback1(object sender, object e)
        {
            string json = JsonConvert.SerializeObject(e);
            System.Diagnostics.Debug.WriteLine($"EventCallback1=> sender:{sender},e:{json}");
        }

        private void EventCallback2(object sender, object e)
        {
            string json = JsonConvert.SerializeObject(e);
            System.Diagnostics.Debug.WriteLine($"EventCallback2=> sender:{sender},e:{json}");
        }

    }

    class DemoB
    {
        public void ShowMsg(string name, string msg)
        {
            System.Diagnostics.Debug.WriteLine($"ShowMsg=> name:{name},msg:{msg}");
            var eventMsg = new EventMessage
            {
                Name = name,
                Msg = msg,
                CreatedDate = DateTime.Now
            };
            EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg));
        }
    }

//main方法中使用:
var demoA = new DemoA();
var demoB = new DemoB();

demoB.ShowMsg("夢在旅途", "i love csharp and java!");

從上述示例代碼中可以看出,DemoA與DemoB各為獨立,互不依賴,它們都不知道有對方的存在,它們只關心業務的處理,通過執行demoB.ShowMsg方法進而觸發回調demoA.EventCallback1,demoA.EventCallback2方法,是不是比起直接從DemoA中調DemoB更好呢?

c#有委托類型(方法的引用),那如果是在java中該如何實現呢?

其實同理,我們可以藉助匿名內部類+匿名實現類的方式(如:函數式介面)實現與C#異曲同工的效果,同樣可以實現類似的事件發佈與訂閱功能,如下便是採用java語言的實現EventPublishSubscribeUtils類的代碼:

這個因項目需要,我特意實現了兩種模式,一種支持1對多的普通方式,另一種支持1對1的訂閱回調方式,有返回值。


/**
 * 自定義事件發佈訂閱回調工具類(業務解藕、關註點分離,避免互相依賴)
 * EventBus簡化版,觀察者模式
 * <pre>
 * 支持兩種模式
 * 1.無返回值:訂閱事件消費(register)+ 發佈事件消息(publishEvent/publishEventAsync)
 * 2.有返回值:監聽回調通知處理(listenCallback)+通知回調(notifyCallback),通過notifyMessageType+MessageChannel 即可標識唯一的一組通知回調與監聽回調處理
 * <pre>
 * @author zuowenjun
 * @date 20200310
 */
public final class EventPublishSubscribeUtils {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishSubscribeUtils.class);

    private static final Map<Class<?>, LinkedList<Consumer<Object>>> eventConsumers = new ConcurrentHashMap<>();

    private static final Map<Class<?>, ConcurrentHashMap<MessageChannel, Function<Object, Object>>> callbackFuncs = new ConcurrentHashMap<>();

    private EventPublishSubscribeUtils() {
    }


    /**
     * 註冊事件回調消費者
     * 用法:EventSubscribeConsumeUtils.register(this::xxxx方法) 或lambda表達式
     * 註意:若回調方法添加了事務註解,則應指派其代理對象的方法來完成回調,如:
     * EventSubscribeConsumeUtils.register((xxxService)SpringUtils.getBean(this.class)::xxxx方法)
     *
     * @param eventConsumer
     */
    public static void register(Class<?> eventMessageType, Consumer<Object> eventConsumer) {

        if (eventConsumer == null) {
            return;
        }

        LinkedList<Consumer<Object>> eventConsumerItems = null;
        if (!eventConsumers.containsKey(eventMessageType)) {
            eventConsumers.putIfAbsent(eventMessageType, new LinkedList<>());
        }
        eventConsumerItems = eventConsumers.get(eventMessageType);

        eventConsumerItems.add(eventConsumer);
    }

    /**
     * 取消訂閱回調
     *
     * @param eventMessageType
     * @param eventConsumer
     */
    public static void unRegister(Class<?> eventMessageType, Consumer<Object> eventConsumer) {
        if (!eventConsumers.containsKey(eventMessageType)) {
            return;
        }

        LinkedList<Consumer<Object>> eventConsumerItems = eventConsumers.get(eventMessageType);
        int eventConsumerIndex = eventConsumerItems.indexOf(eventConsumer);
        if (eventConsumerIndex == -1) {
            return;
        }
        eventConsumerItems.remove(eventConsumerIndex);
    }


    /**
     * 發佈事件,同步觸發執行回調事件消費者方法(存在阻塞等待),即事件消息生產者
     * 用法:在需要觸發事件消息回調時調用,如:publishEvent(eventMessage);
     *
     * @param eventMessage
     */
    public static <T> void publishEvent(T eventMessage) {
        Class<?> eventMessageType = eventMessage.getClass();

        if (!eventConsumers.containsKey(eventMessageType)) {
            return;
        }

        LOGGER.info("事件已發佈,正在執行通知消費:{}", JSONObject.toJSONString(eventMessage));

        for (Consumer<Object> eventConsumer : eventConsumers.get(eventMessageType)) {
            try {
                eventConsumer.accept(eventMessage);
            } catch (Exception ex) {
                LOGGER.error("eventConsumer.accept error:{},eventMessageType:{},eventMessage:{}",
                        ex, eventMessageType, JSONObject.toJSONString(eventMessage));
            }
        }
    }


    /**
     * 發佈事件,非同步觸發執行回調事件消費者方法(非同步非阻塞),即事件消息生產者
     * 用法:在需要觸發事件消息回調時調用,如:publishEventAsync(eventMessage);
     *
     * @param eventMessage
     */
    public static <T> void publishEventAsync(final T eventMessage) {
        Executor asyncTaskExecutor = (Executor) SpringUtils.getBean("asyncTaskExecutor");
        asyncTaskExecutor.execute(() -> {
            publishEvent(eventMessage);
        });
    }


    /**
     * 監聽回調處理(需要有返回值),即有返回值的回調消費者
     *
     * @param notifyMessageType
     * @param messageChannel
     * @param callbackFunc
     */
    public static void listenCallback(Class<?> notifyMessageType, MessageChannel messageChannel, Function<Object, Object> callbackFunc) {
        if (!callbackFuncs.containsKey(notifyMessageType)) {
            callbackFuncs.putIfAbsent(notifyMessageType, new ConcurrentHashMap<>());
        }

        Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.get(notifyMessageType);
        if (!functionMap.containsKey(messageChannel)) {
            functionMap.putIfAbsent(messageChannel, callbackFunc);
        } else {
            LOGGER.error("該通知消息類型:{}+消息通道:{},已被訂閱監聽,重覆訂閱監聽無效!", notifyMessageType.getSimpleName(), messageChannel.getDescription());
        }

    }

    /**
     * 通知回調(同步等待獲取監聽回調的處理結果),即生產者
     *
     * @param notifyMessage
     * @param messageChannel
     * @param <R>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <R> R notifyCallback(Object notifyMessage, MessageChannel messageChannel) {
        Class<?> notifyMessageType = notifyMessage.getClass();

        Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.getOrDefault(notifyMessageType, null);
        if (functionMap != null) {
            Function<Object, Object> callbackFunction = functionMap.getOrDefault(messageChannel, null);
            if (callbackFunction != null) {
                LOGGER.info("通知回調消息已發佈,正在執行回調處理:{},messageChannel:[{}]", JSONObject.toJSONString(notifyMessage), messageChannel.getDescription());
                Object result = callbackFunction.apply(notifyMessage);
                try {
                    return (R) result;
                } catch (ClassCastException castEx) {
                    throw new ClassCastException(String.format("監聽回調處理後返回值實際類型與發佈通知回調待接收的值預期類型不一致,導致類型轉換失敗:%s," +
                                    "請確保notifyCallback與listenCallback針對通知消息類型:%s+消息通道:%s返回值類型必需一致。",
                            castEx.getMessage(), notifyMessageType.getSimpleName(), messageChannel.getDescription()));
                }

            }
        }
        return null;
    }


}

當然如果需要實現1對1的通訊,除了指定消息類型外,還需要指定消息通訊通道(即:唯一標識),目的是可以實現同一種消息類型,支持不同的點對點的處理。

/**
 * 自定義消息通道
 * 作用:用於識別同一個消息類型下不同的監聽回調者(notifyMessage+messageChannel 即可標識唯一的一組通知回調[生產者]與監聽回調[消費者])
 * @author zuowenjun
 * @date 2020-03-31
 */
public enum MessageChannel {
    None("無效"),
    MSG_A("測試消息A"),
    ;

    private String description;

    MessageChannel(String description) {
        this.description=description;
    }

    public String getDescription() {
        return description;
    }
}

使用方法示例代碼如下:


@Service
public class DemoAService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoAService.class);
    

    public void showMsg(String name, String msg) {
        System.out.printf("【%1$tF %1$tT.%1$tL】hello!%s,DemoAService showMsg:%s %n", new Date(), name, msg);

        EventMessage eventMessage = new EventMessage();
        eventMessage.setName("aaa");
        eventMessage.setMsg("test");
        eventMessage.setCreatedDate(new Date());
        EventPublishSubscribeUtils.publishEvent(eventMessage);

        String msgJsonStr = EventPublishSubscribeUtils.notifyCallback(eventMessage, MessageChannel.MSG_A);

        System.out.printf("【%1$tF %1$tT.%1$tL】DemoAService showMsg notifyCallback json result:%2$s %n", new Date(), msgJsonStr);
    }

}


@Service
public class DemoBService {

    @PostConstruct
    public void init(){

        //訂閱消費,無返回值,支持1對多,即:同一個消息類型可同時被多個消費者訂閱
        EventPublishSubscribeUtils.register(EventMessage.class,this::showFinishedMsg);

        //訂閱監聽回調,有返回值,只能1對1
        EventPublishSubscribeUtils.listenCallback(EventMessage.class, MessageChannel.MSG_A,this::getMsgCallbak);
    }

    private void showFinishedMsg(Object eventMsg){
        EventMessage eventMessage=(EventMessage)eventMsg;
        System.out.printf("【%1$tF %1$tT.%1$tL】%s,receive msg:%s doing...%n",
                eventMessage.getCreatedDate(),eventMessage.getName(),eventMessage.getMsg());

        //模擬邏輯處理
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("【%1$tF %1$tT.%1$tL】%s,do finished!!!%n",new Date(),eventMessage.getName());
    }

    private String getMsgCallbak(Object eventMsg){
        EventMessage eventMessage=(EventMessage)eventMsg;
        eventMessage.setMsg(eventMessage.getMsg()+"--callback added!");
        eventMessage.setCreatedDate(new Date());

        System.out.printf("【%1$tF %1$tT.%1$tL】%s,do msg callback!!!%n",new Date(),eventMessage.getName());

        return JSONObject.toJSONString(eventMessage);
    }

}

//在某個入口的service中調用(如需演示可定義實現ApplicationRunner的run方法,在run方法中執行調用即可):

@Autowired
private DemoAService demoAService;

demoAService.showMsg("zwj","i love java 2020!");

如上代碼所示,我們藉助於EventPublishSubscribeUtils,解耦了兩個Service Bean之間的依賴,避免了迴圈依賴的問題,去掉了之前為瞭解決迴圈依賴而使用@Lazy註解的方式,更易於擴展與更改。其實Spring底層也使用了類似的Event機制,說明這種方式還是有合適的用武之地的。

這裡我通過簡單的關係圖來對比未引用EventPublishSubscribeUtils前與引用後的區別,大家可以感受一下哪種更方便:
之前:

之後:

最後,關於業務解耦,分清業務邊界,我個人認為跨進程通訊使用MQ,同進程跨多模塊(類,或者說跨多業務邊界)可使用Event事件驅動思路來解決。大家覺得如何呢?如果有更好的方案歡迎評論交流,謝謝。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1 #include <Windows.h> 2 #include <iostream> 3 using namespace std; 4 5 int main() 6 { 7 while(true) 8 { 9 if(-32767 == GetAsyncKeyState('A')) //不支持大小 ...
  • 1. OpenCV讀取圖片 1.1 簡述 OpenCV讀取圖片的方法是cv2.imread(),讀取出來圖片的格式是BGR與常規的彩色圖像的格式(RGB)相反,這一點一定要註意。 OpenCV顯示圖片的方法是cv2.imshow(),顯示的格式是BGR。 小生就言於此O(∩_∩)O哈哈~,直接上例子 ...
  • 消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中間件。消息形式支持點對點和訂閱-發佈。 ActiveMQ是什麼 1、ActiveMQ是消息隊列技術,為解決高併發問題而生 2、ActiveMQ生 ...
  • 我們在的項目組呢,有一項工作是,收郵件(很大程度上使用郵件是為了存個底),然後從我們的系統裡邊查一下相關信息,然後回覆個郵件的工作。雖然工作量並不大,但是會把時間切的稀碎。為了拯救我的時間,所以做了一個郵件的值班機器人。讓他來頂替我自動回覆郵件,考慮到這個東西應該也有不少人會用得到,所以就把這個東西 ...
  • 數據很重要 在介紹MyBatis事務之前,先普及下資料庫事務相關知識 事務(Transaction)是訪問並可能更新資料庫中各種數據項的一個程式執行單元(unit)。事務通常由高級資料庫操縱語言或編程語言(如SQL,C++或Java)書寫的用戶程式的執行所引起,並用形如begin transacti ...
  • 原創聲明:本文轉載自公眾號【胖滾豬學編程】​ 某日,胖滾豬寫的代碼導致了一個生產bug,奮戰到凌晨三點依舊沒有解決問題。胖滾熊一看,只用了一個volatile就解決了。並告知胖滾豬,這是併發編程導致的坑。這讓胖滾豬堅定了要學好併發編程的決心。。於是,開始了我們併發編程的第一課。 序幕 BUG源頭之一 ...
  • 1、創建聊天消息表,其表的欄位有消息內容,發送時間和發送者的名稱; SQL: CREATE TABLE `guanhui`.`message` ( `id` INT(10) NOT NULL AUTO_INCREMENT COMMENT '消息ID' , `content` VARCHAR(255) ...
  • 官網:http://www.axios-js.com/zh-cn/docs/ 什麼是 axios? Axios 是一個基於 promise 的 HTTP 庫,可以用在瀏覽器和 node.js 中。 特性 從瀏覽器中創建 XMLHttpRequests 從 node.js 創建 http 請求 支持  ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...