## 圖示 ![image](https://img2023.cnblogs.com/blog/1866596/202307/1866596-20230728101801138-171904977.png) ## 1. 現狀 > 場景: 假設設備上報不同類型的消息,我們要對不同類型的消息做不同的處理 ...
圖示
1. 現狀
場景: 假設設備上報不同類型的消息,我們要對不同類型的消息做不同的處理。如果我們通過if..else的方式處理的話會顯得比較冗餘。
例如:
if("alarmEvent".equals(msg)){
// 處理告警消息邏輯 ...
}else if("deviceBase".equals(msg)){
// 處理設備上報的基本信息 ...
}else if("heartBeat".equals(msg)){
// 處理設備心跳消息 ...
}else {
// ...
}
2. 消息處理Handler
那麼對於不同消息的不同的處理邏輯我們可以單獨放在一個實現類中,這些類有著相同的行為,所以我們可以定義一個介面:
public interface MessageHandler<T>{
T invoke(String bizUpMessage); // 處理接收到的消息
String getName();
String getIdentification(); // 獲取消息標誌
}
針對於不同的消息,我們可以有各自的實現
實現類1:
@Component
public class DeviceBaseInfoHandler implements MessageHandler<String> {
@Override
public String invoke(String bizUpMessage) {
// 處理設備上報的基本信息 ...
return "";
}
@Override
public String getName() {
return "";
}
@Override
public String getIdentification() {
return "DeviceBaseInfoRequest";
}
}
實現類2:
@Component
public class AlarmEventHandler implements MessageHandler<String>{
@Override
public String invoke(String bizUpMessage) {
// 處理告警信息 ...
return null;
}
@Override
public String getName() {
return "";
}
@Override
public String getIdentification() {
return "AlarmEventRequest";
}
}
3. Handler註冊處
我們可以將上一步的消息處理對象,根據消息的標誌(Identification)不同,將消息放在一個Map中:
public class MessageHandlerRegisty {
private Map<String, List<MessageHandler>> registerMap = new HashMap<>();
// 註冊各種 handler
public void regist(String identification, MessageHandler handler){
if(this.contains(identification)){
get(identification).add(handler);
} else {
List<MessageHandler> list = new ArrayList<>();
list.add(handler);
registerMap.put(identification, list);
}
}
public boolean contains(String identification){
return registerMap.containsKey(identification);
}
public List<MessageHandler> get(String identification){
return registerMap.get(identification);
}
}
4. 消息接收Service
消息接收介面
public interface IMessageService {
void messageReceived(String bizUpMessage); // 接收消息
}
5. 消息接收ServiceImpl
消息接收實現類
public class MessageServiceImpl implements IMessageService{
public MessageHandlerRegisty registy;
// 線程池
public ExecutorService executorService = new ThreadPoolExecutor(3, 10, 15, TimeUnit.MINUTES, new LinkedBlockingQueue(1000), new LogdDiscardPolicy());
public void messageReceived(String bizUpMessage){
// 獲取消息的標誌Identification
String identification = extractIdentification(bizUpMessage);
if(StringUtils.isBlank(identification)){
return;
}
if(!registy.contains(identification)){
return;
}
for(MessageHandler handler : registy.get(identification)){
execute(handler, bizUpMessage); // 多線程處理消息
}
}
public MessageHandlerRegisty getRegisty() {
return registy;
}
public void setRegisty(MessageHandlerRegisty registy) {
this.registy = registy;
}
private void execute(MessageHandler handler, String bizUpMessage){
executorService.submit(new IotMessageTask(handler, bizUpMessage));
}
public final class LogdDiscardPolicy extends ThreadPoolExecutor.DiscardPolicy {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.warn("MessageService pool is full !!!!!!!!");
}
}
final class IotMessageTask implements Runnable {
private MessageHandler handler;
private String bizUpMessage;
public IotMessageTask(MessageHandler handler, String bizUpMessage){
this.handler = handler;
this.bizUpMessage = bizUpMessage;
}
@Override
public void run() {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
handler.invoke(bizUpMessage);
stopwatch = stopwatch.stop();
long cost = stopwatch.elapsed(TimeUnit.MICROSECONDS);
logTime(cost);
}catch (Exception e){
log.error("handler execute error - ", e);
}
}
private void logTime(long cost){
if(cost > 50){
log.warn(" handler -> {}, cost too much -> {}", handler.getName(), cost);
} else {
log.info("handler -> {}, cost -> {}", handler.getName(), cost);
}
}
}
}
6. 將MessageServiceImpl對象配置到spring容器
@Configuration
public class ManagerConfig {
@Bean
public IMessageService messageService(){
MessageServiceImpl messageService = new MessageServiceImpl();
messageService.setRegisty(buildRegisty());
return messageService;
}
// 消息處理handler
@Autowired
private DeviceBaseInfoHandler deviceBaseInfoHandler;
@Autowired
private AlarmEventHandler alarmEventHandler;
private MessageHandlerRegisty buildRegisty(){
MessageHandlerRegisty registy = new MessageHandlerRegisty();
registy.regist(deviceBaseInfoHandler.getIdentification(), deviceBaseInfoHandler);
registy.regist(alarmEventHandler.getIdentification(),alarmEventHandler);
return registy;
}
}