如果熟悉C 語言的小伙伴們一般都會知道委托、事件的好處,只需在某個類中提前定義好公開的委托或事件(委托的特殊表現形式)變數,然後在其它類中就可以很隨意的訂閱該委托或事件,當委托或事件被觸發執行時,會自動通知所有的訂閱者進行消費處理。(觀察者模式用委托來實現是最好不過了,DDD所提倡的事件驅動其根本理 ...
如果熟悉C#語言的小伙伴們一般都會知道委托、事件的好處,只需在某個類中提前定義好公開的委托或事件(委托的特殊表現形式)變數,然後在其它類中就可以很隨意的訂閱該委托或事件,當委托或事件被觸發執行時,會自動通知所有的訂閱者進行消費處理。(觀察者模式用委托來實現是最好不過了,DDD所提倡的事件驅動其根本理念也是如此),當然我這裡想到的是不需要在每個類中進行定義委托或事件,而是由一個統一的中介者(即EventPublishSubscribeUtils)來提供事件的訂閱及發佈操作,這樣各模塊之間無需直接依賴,只需通過中介者完成發佈通知與訂閱回調即可,何樂而不為呢?
這裡我先藉助C#語言獨有的委托類型快速實現了一個簡易的EventPublishSubscribeUtils,代碼如下:
/// <summary>
/// 自定義事件發佈訂閱回調工具類(業務解藕、關註點分離,避免互相依賴)--演示版
/// EventBus簡化版,觀察者模式
/// author:zuowenjun
/// </summary>
public static class EventPublishSubscribeUtils
{
private static ConcurrentDictionary<Type, EventHandler<object>> EventHandlers { get; } = new ConcurrentDictionary<Type, EventHandler<object>>();
private static void removeRegisters(ref EventHandler<object> srcEvents, EventHandler<object> removeTargetEvents)
{
var evtTypes = removeTargetEvents.GetInvocationList().Select(d => d.GetType());
var registeredEventHandlers = Delegate.Combine(srcEvents.GetInvocationList().Where(ei => evtTypes.Contains(ei.GetType())).ToArray());
srcEvents -= (EventHandler<object>)registeredEventHandlers;
}
public static void Register<T>(EventHandler<object> eventHandlers)
{
EventHandlers.AddOrUpdate(typeof(T), eventHandlers,
(t, e) =>
{
//先根據訂閱委托類型匹匹配過濾掉之前已有的相同訂閱,然後再重新訂閱,防止重覆訂閱,多次執行的情況。
removeRegisters(ref e, eventHandlers);
e += eventHandlers;
return e;
});
}
public static void UnRegister<T>(EventHandler<object> eventHandlers = null)
{
Type eventMsgType = typeof(T);
if (eventHandlers == null)
{
EventHandlers.TryRemove(eventMsgType, out eventHandlers);
return;
}
var e = EventHandlers[eventMsgType];
removeRegisters(ref e, eventHandlers);
}
public static void PublishEvent<T>(T eventMsg, object sender)
{
Type eventMsgType = eventMsg.GetType();
if (EventHandlers.ContainsKey(eventMsgType))
{
EventHandlers[eventMsgType].Invoke(sender, eventMsg);
}
}
}
然後使用就比較簡單了,我們只需通過EventPublishSubscribeUtils.Register註冊訂閱事件消息,通過EventPublishSubscribeUtils.PublishEvent發佈事件通知,這樣就可以讓兩個甚至多個不相關的模塊(類)能夠通過消息類型實現1對多的通訊與協同處理。使用示例代碼如下:
class EventMessage
{
public string Name { get; set; }
public string Msg { get; set; }
public DateTime CreatedDate { get; set; }
}
class DemoA
{
public DemoA()
{
EventHandler<object> eventHandlers = EventCallback1;
eventHandlers += EventCallback2;
EventPublishSubscribeUtils.Register<EventMessage>(eventHandlers);
}
private void EventCallback1(object sender, object e)
{
string json = JsonConvert.SerializeObject(e);
System.Diagnostics.Debug.WriteLine($"EventCallback1=> sender:{sender},e:{json}");
}
private void EventCallback2(object sender, object e)
{
string json = JsonConvert.SerializeObject(e);
System.Diagnostics.Debug.WriteLine($"EventCallback2=> sender:{sender},e:{json}");
}
}
class DemoB
{
public void ShowMsg(string name, string msg)
{
System.Diagnostics.Debug.WriteLine($"ShowMsg=> name:{name},msg:{msg}");
var eventMsg = new EventMessage
{
Name = name,
Msg = msg,
CreatedDate = DateTime.Now
};
EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg));
}
}
//main方法中使用:
var demoA = new DemoA();
var demoB = new DemoB();
demoB.ShowMsg("夢在旅途", "i love csharp and java!");
從上述示例代碼中可以看出,DemoA與DemoB各為獨立,互不依賴,它們都不知道有對方的存在,它們只關心業務的處理,通過執行demoB.ShowMsg方法進而觸發回調demoA.EventCallback1,demoA.EventCallback2方法,是不是比起直接從DemoA中調DemoB更好呢?
c#有委托類型(方法的引用),那如果是在java中該如何實現呢?
其實同理,我們可以藉助匿名內部類+匿名實現類的方式(如:函數式介面)實現與C#異曲同工的效果,同樣可以實現類似的事件發佈與訂閱功能,如下便是採用java語言的實現EventPublishSubscribeUtils類的代碼:
這個因項目需要,我特意實現了兩種模式,一種支持1對多的普通方式,另一種支持1對1的訂閱回調方式,有返回值。
/**
* 自定義事件發佈訂閱回調工具類(業務解藕、關註點分離,避免互相依賴)
* EventBus簡化版,觀察者模式
* <pre>
* 支持兩種模式
* 1.無返回值:訂閱事件消費(register)+ 發佈事件消息(publishEvent/publishEventAsync)
* 2.有返回值:監聽回調通知處理(listenCallback)+通知回調(notifyCallback),通過notifyMessageType+MessageChannel 即可標識唯一的一組通知回調與監聽回調處理
* <pre>
* @author zuowenjun
* @date 20200310
*/
public final class EventPublishSubscribeUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishSubscribeUtils.class);
private static final Map<Class<?>, LinkedList<Consumer<Object>>> eventConsumers = new ConcurrentHashMap<>();
private static final Map<Class<?>, ConcurrentHashMap<MessageChannel, Function<Object, Object>>> callbackFuncs = new ConcurrentHashMap<>();
private EventPublishSubscribeUtils() {
}
/**
* 註冊事件回調消費者
* 用法:EventSubscribeConsumeUtils.register(this::xxxx方法) 或lambda表達式
* 註意:若回調方法添加了事務註解,則應指派其代理對象的方法來完成回調,如:
* EventSubscribeConsumeUtils.register((xxxService)SpringUtils.getBean(this.class)::xxxx方法)
*
* @param eventConsumer
*/
public static void register(Class<?> eventMessageType, Consumer<Object> eventConsumer) {
if (eventConsumer == null) {
return;
}
LinkedList<Consumer<Object>> eventConsumerItems = null;
if (!eventConsumers.containsKey(eventMessageType)) {
eventConsumers.putIfAbsent(eventMessageType, new LinkedList<>());
}
eventConsumerItems = eventConsumers.get(eventMessageType);
eventConsumerItems.add(eventConsumer);
}
/**
* 取消訂閱回調
*
* @param eventMessageType
* @param eventConsumer
*/
public static void unRegister(Class<?> eventMessageType, Consumer<Object> eventConsumer) {
if (!eventConsumers.containsKey(eventMessageType)) {
return;
}
LinkedList<Consumer<Object>> eventConsumerItems = eventConsumers.get(eventMessageType);
int eventConsumerIndex = eventConsumerItems.indexOf(eventConsumer);
if (eventConsumerIndex == -1) {
return;
}
eventConsumerItems.remove(eventConsumerIndex);
}
/**
* 發佈事件,同步觸發執行回調事件消費者方法(存在阻塞等待),即事件消息生產者
* 用法:在需要觸發事件消息回調時調用,如:publishEvent(eventMessage);
*
* @param eventMessage
*/
public static <T> void publishEvent(T eventMessage) {
Class<?> eventMessageType = eventMessage.getClass();
if (!eventConsumers.containsKey(eventMessageType)) {
return;
}
LOGGER.info("事件已發佈,正在執行通知消費:{}", JSONObject.toJSONString(eventMessage));
for (Consumer<Object> eventConsumer : eventConsumers.get(eventMessageType)) {
try {
eventConsumer.accept(eventMessage);
} catch (Exception ex) {
LOGGER.error("eventConsumer.accept error:{},eventMessageType:{},eventMessage:{}",
ex, eventMessageType, JSONObject.toJSONString(eventMessage));
}
}
}
/**
* 發佈事件,非同步觸發執行回調事件消費者方法(非同步非阻塞),即事件消息生產者
* 用法:在需要觸發事件消息回調時調用,如:publishEventAsync(eventMessage);
*
* @param eventMessage
*/
public static <T> void publishEventAsync(final T eventMessage) {
Executor asyncTaskExecutor = (Executor) SpringUtils.getBean("asyncTaskExecutor");
asyncTaskExecutor.execute(() -> {
publishEvent(eventMessage);
});
}
/**
* 監聽回調處理(需要有返回值),即有返回值的回調消費者
*
* @param notifyMessageType
* @param messageChannel
* @param callbackFunc
*/
public static void listenCallback(Class<?> notifyMessageType, MessageChannel messageChannel, Function<Object, Object> callbackFunc) {
if (!callbackFuncs.containsKey(notifyMessageType)) {
callbackFuncs.putIfAbsent(notifyMessageType, new ConcurrentHashMap<>());
}
Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.get(notifyMessageType);
if (!functionMap.containsKey(messageChannel)) {
functionMap.putIfAbsent(messageChannel, callbackFunc);
} else {
LOGGER.error("該通知消息類型:{}+消息通道:{},已被訂閱監聽,重覆訂閱監聽無效!", notifyMessageType.getSimpleName(), messageChannel.getDescription());
}
}
/**
* 通知回調(同步等待獲取監聽回調的處理結果),即生產者
*
* @param notifyMessage
* @param messageChannel
* @param <R>
* @return
*/
@SuppressWarnings("unchecked")
public static <R> R notifyCallback(Object notifyMessage, MessageChannel messageChannel) {
Class<?> notifyMessageType = notifyMessage.getClass();
Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.getOrDefault(notifyMessageType, null);
if (functionMap != null) {
Function<Object, Object> callbackFunction = functionMap.getOrDefault(messageChannel, null);
if (callbackFunction != null) {
LOGGER.info("通知回調消息已發佈,正在執行回調處理:{},messageChannel:[{}]", JSONObject.toJSONString(notifyMessage), messageChannel.getDescription());
Object result = callbackFunction.apply(notifyMessage);
try {
return (R) result;
} catch (ClassCastException castEx) {
throw new ClassCastException(String.format("監聽回調處理後返回值實際類型與發佈通知回調待接收的值預期類型不一致,導致類型轉換失敗:%s," +
"請確保notifyCallback與listenCallback針對通知消息類型:%s+消息通道:%s返回值類型必需一致。",
castEx.getMessage(), notifyMessageType.getSimpleName(), messageChannel.getDescription()));
}
}
}
return null;
}
}
當然如果需要實現1對1的通訊,除了指定消息類型外,還需要指定消息通訊通道(即:唯一標識),目的是可以實現同一種消息類型,支持不同的點對點的處理。
/**
* 自定義消息通道
* 作用:用於識別同一個消息類型下不同的監聽回調者(notifyMessage+messageChannel 即可標識唯一的一組通知回調[生產者]與監聽回調[消費者])
* @author zuowenjun
* @date 2020-03-31
*/
public enum MessageChannel {
None("無效"),
MSG_A("測試消息A"),
;
private String description;
MessageChannel(String description) {
this.description=description;
}
public String getDescription() {
return description;
}
}
使用方法示例代碼如下:
@Service
public class DemoAService {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoAService.class);
public void showMsg(String name, String msg) {
System.out.printf("【%1$tF %1$tT.%1$tL】hello!%s,DemoAService showMsg:%s %n", new Date(), name, msg);
EventMessage eventMessage = new EventMessage();
eventMessage.setName("aaa");
eventMessage.setMsg("test");
eventMessage.setCreatedDate(new Date());
EventPublishSubscribeUtils.publishEvent(eventMessage);
String msgJsonStr = EventPublishSubscribeUtils.notifyCallback(eventMessage, MessageChannel.MSG_A);
System.out.printf("【%1$tF %1$tT.%1$tL】DemoAService showMsg notifyCallback json result:%2$s %n", new Date(), msgJsonStr);
}
}
@Service
public class DemoBService {
@PostConstruct
public void init(){
//訂閱消費,無返回值,支持1對多,即:同一個消息類型可同時被多個消費者訂閱
EventPublishSubscribeUtils.register(EventMessage.class,this::showFinishedMsg);
//訂閱監聽回調,有返回值,只能1對1
EventPublishSubscribeUtils.listenCallback(EventMessage.class, MessageChannel.MSG_A,this::getMsgCallbak);
}
private void showFinishedMsg(Object eventMsg){
EventMessage eventMessage=(EventMessage)eventMsg;
System.out.printf("【%1$tF %1$tT.%1$tL】%s,receive msg:%s doing...%n",
eventMessage.getCreatedDate(),eventMessage.getName(),eventMessage.getMsg());
//模擬邏輯處理
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("【%1$tF %1$tT.%1$tL】%s,do finished!!!%n",new Date(),eventMessage.getName());
}
private String getMsgCallbak(Object eventMsg){
EventMessage eventMessage=(EventMessage)eventMsg;
eventMessage.setMsg(eventMessage.getMsg()+"--callback added!");
eventMessage.setCreatedDate(new Date());
System.out.printf("【%1$tF %1$tT.%1$tL】%s,do msg callback!!!%n",new Date(),eventMessage.getName());
return JSONObject.toJSONString(eventMessage);
}
}
//在某個入口的service中調用(如需演示可定義實現ApplicationRunner的run方法,在run方法中執行調用即可):
@Autowired
private DemoAService demoAService;
demoAService.showMsg("zwj","i love java 2020!");
如上代碼所示,我們藉助於EventPublishSubscribeUtils,解耦了兩個Service Bean之間的依賴,避免了迴圈依賴的問題,去掉了之前為瞭解決迴圈依賴而使用@Lazy註解的方式,更易於擴展與更改。其實Spring底層也使用了類似的Event機制,說明這種方式還是有合適的用武之地的。
這裡我通過簡單的關係圖來對比未引用EventPublishSubscribeUtils前與引用後的區別,大家可以感受一下哪種更方便:
之前:
之後:
最後,關於業務解耦,分清業務邊界,我個人認為跨進程通訊使用MQ,同進程跨多模塊(類,或者說跨多業務邊界)可使用Event事件驅動思路來解決。大家覺得如何呢?如果有更好的方案歡迎評論交流,謝謝。