Netty構建分散式消息隊列實現原理淺析

来源:http://www.cnblogs.com/jietang/archive/2016/09/07/5847458.html
-Advertisement-
Play Games

在本人的上一篇博客文章:Netty構建分散式消息隊列(AvatarMQ)設計指南之架構篇 中,重點向大家介紹了AvatarMQ主要構成模塊以及目前存在的優缺點。最後以一個生產者、消費者傳遞消息的例子,具體演示了AvatarMQ所具備的基本消息路由功能。而本文的寫作目的,是想從開發、設計的角度,簡單的 ...


  在本人的上一篇博客文章:Netty構建分散式消息隊列(AvatarMQ)設計指南之架構篇 中,重點向大家介紹了AvatarMQ主要構成模塊以及目前存在的優缺點。最後以一個生產者、消費者傳遞消息的例子,具體演示了AvatarMQ所具備的基本消息路由功能。而本文的寫作目的,是想從開發、設計的角度,簡單的對如何使用Netty,構建分散式消息隊列背後的技術細節、原理,進行一下簡單的分析和說明。

  首先,在一個企業級的架構應用中,究竟何時需引入消息隊列呢?本人認為,最經常的情況,無非這幾種:做業務解耦、事件消息廣播、消息流控處理。其中,對於業務解耦是作為消息隊列,要解決的一個首要問題。所謂業務解耦,就是說在一個業務流程處理上,只關註具體的流程,盡到通知的責任即可,不必等待消息處理的結果。

  總得來看,企業級系統模塊通信的方式通常情況下,無非兩種。

  同步方式:REST、RPC方式實現;非同步方式:消息中間件(消息隊列)方式實現。

  同步方式的優點:可以基於http協議之上,無需中間件代理,系統架構相對而言比較簡單。缺點是:客戶端和服務端緊密耦合,並且要實時線上通信,否則會導致消息發送失敗。

  非同步方式的優點:客戶端和服務端互相解耦,雙方可以不產生依賴。缺點是:由於引入了消息中間件,在編程的時候會增加難度繫數。此外,消息中間件的可靠性、容錯性、健壯性往往成為這類架構的決定性因素。

  舉一個本人工作中的例子向大家說明一下:移動業務中的產品訂購中心,每當一個用戶通過某些渠道(營業廳、自助終端等等)開通、訂購了某個套餐之後,如果這些套餐涉及第三方平臺派單的話,產品訂購中心會向第三方平臺發起訂購請求操作。試想一下,如果遇到高峰受理時間段,由於業務受理量的激增,導致一些外圍系統的響應速度降低(比如業務網關響應速度不及時、網路延時等等原因),最終用戶開通一個套餐花在主流程的時間會延長很多,這個會造成極不好的用戶體驗,最終可能導致受理失敗。在上述的場景裡面,我們就可以很好的引入一個消息隊列進行業務的解耦,具體來說,產品訂購中心只要“通知”第三方平臺,我們的套餐開通成功了,並不一定非要同步阻塞地等待其真正的開通處理完成。正因為如此,消息隊列逐漸成為當下系統模塊通信的主要方式手段。

  當今在Java的消息隊列通信領域,有很多主流的消息中間件,比如RabbitMQ、ActiveMQ、以及炙手可熱Kafka。其中ActiveMQ是基於JMS的標準之上開發定製的一套消息隊列系統,性能穩定,訪問介面也非常友好,但是這類的消息隊列在訪問吞吐量上有所折扣;另外一個方面,比如Kafka這樣,以高效吞吐量著稱的消息隊列系統,但是在穩定性和可靠性上,能力似乎還不夠,因此更多的是用在服務日誌傳輸、短消息推送等等對於可靠性不高的業務場景之中。總結起來,不管是ActiveMQ還是Kafka,其框架的背後涉及到很多非同步網路通信、多線程、高併發處理方面的專業技術知識。但本文的重點,也不在於介紹這些消息中間件背後的技術細節,而是想重點闡述一下,如何透過上述消息隊列的基本原理,在必要的時候,開發定製一套符合自身業務要求的消息隊列系統時,能夠獲得更加全面的視角去設計、考量這些問題。

  因此本人用心開發實現了一個,基於Netty的消息隊列系統:AvatarMQ。當然,在設計、實現AvatarMQ的時候,我會適當參考這些成熟消息中間件中用到的很多重要的思想理念。

  當各位從github上面下載到AvatarMQ的源代碼的時候,可以發現,其中的包結構如下所示:

     

  現在對每個包的主要功能進行一下簡要說明(下麵省略首碼com.newlandframework.avatarmq)。

  broker:消息中間件的伺服器模塊,主要負責消息的路由、負載均衡,對於生產者、消費者進行消息的應答回覆處理(ACK),AvatarMQ中的中心節點,是連接生產者、消費者的橋梁紐帶。

  consumer:消息中間件中的消費者模塊,負責接收生產者過來的消息,在設計的時候,會對消費者進行一個集群化管理,同一個集群標識的消費者,會構成一個大的消費者集群,作為一個整體,接收生產者投遞過來的消息。此外,還提供消費者接收消息相關的API給客戶端進行調用。

  producer:消息中間件中的生產者模塊,負責生產特定主題(Topic)的消息,傳遞給對此主題感興趣的消費者,同時提供生產者生產消息的API介面,給客戶端使用。

  core:AvatarMQ中消息處理的核心模塊,負責消息的記憶體存儲、應答控制、對消息進行多線程任務分派處理。

  model:主要定義了AvatarMQ中的數據模型對象,比如MessageType消息類型、MessageSource消息源頭等等模型對象的定義。

  msg:主要定義了具體的消息類型對應的結構模型,比如消費者訂閱消息SubscribeMessage、消費者取消訂閱消息UnSubscribeMessage,消息伺服器應答給生產者的應答消息ProducerAckMessage、消息伺服器應答給消費者的應答消息ConsumerAckMessage。

  netty:主要封裝了Netty網路通信相關的核心模塊代碼,比如訂閱消息事件的路由分派策略、消息的編碼、解碼器等等。

  serialize:利用Kryo這個優秀高效的對象序列化、反序列框架對消息對象進行序列化網路傳輸。

  spring:Spring的容器管理類,負責把AvatarMQ中的消息伺服器模塊:Broker,進行容器化管理。這個包裡面的AvatarMQServerStartup是整個AvatarMQ消息伺服器的啟動入口。

  test:這個就不用多說了,就是針對AvatarMQ進行消息路由傳遞的測試demo。

  

  AvatarMQ運行原理示意圖:

  首先是消息生產者客戶端(AvatarMQ Producer)發送帶有主題的消息給消息轉發伺服器(AvatarMQ Broker),消息轉發伺服器確認收到生產者的消息,發送ACK應答給生產者,然後把消息繼續投遞給消費者(AvatarMQ Consumer)。同時broker伺服器接收來自消費者的訂閱、取消訂閱消息,併發送ACK應該給對應的消費者,整個消息系統就是這樣周而複始的工作。

 

  現在再來看一下,AvatarMQ中的核心模塊的組成,如下圖所示:

  

  Producer Manage:消息的生產者,其主要代碼在(com.newlandframework.avatarmq.producer)包之下,其主要代碼模塊關鍵部分簡要說明如下:

package com.newlandframework.avatarmq.producer;

import com.newlandframework.avatarmq.core.AvatarMQAction;
import com.newlandframework.avatarmq.model.MessageSource;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.model.ResponseMessage;
import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.netty.MessageProcessor;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @filename:AvatarMQProducer.java
 * @description:AvatarMQProducer功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQProducer extends MessageProcessor implements AvatarMQAction {

    private boolean brokerConnect = false;
    private boolean running = false;
    private String brokerServerAddress;
    private String topic;
    private String defaultClusterId = "AvatarMQProducerClusters";
    private String clusterId = "";
    private AtomicLong msgId = new AtomicLong(0L);
    
    //連接消息轉發伺服器broker的ip地址,以及生產出來消息附帶的主題信息
    public AvatarMQProducer(String brokerServerAddress, String topic) {
        super(brokerServerAddress);
        this.brokerServerAddress = brokerServerAddress;
        this.topic = topic;
    }
    
    //沒有連接上消息轉發伺服器broker就發送的話,直接應答失敗
    private ProducerAckMessage checkMode() {
        if (!brokerConnect) {
            ProducerAckMessage ack = new ProducerAckMessage();
            ack.setStatus(ProducerAckMessage.FAIL);
            return ack;
        }

        return null;
    }
    
    //啟動消息生產者
    public void start() {
        super.getMessageConnectFactory().connect();
        brokerConnect = true;
        running = true;
    }
    
    //連接消息轉發伺服器broker,設定生產者消息處理鉤子,用於處理broker過來的消息應答
    public void init() {
        ProducerHookMessageEvent hook = new ProducerHookMessageEvent();
        hook.setBrokerConnect(brokerConnect);
        hook.setRunning(running);
        super.getMessageConnectFactory().setMessageHandle(new MessageProducerHandler(this, hook));
    }
    
    //投遞消息API
    public ProducerAckMessage delivery(Message message) {
        if (!running || !brokerConnect) {
            return checkMode();
        }

        message.setTopic(topic);
        message.setTimeStamp(System.currentTimeMillis());

        RequestMessage request = new RequestMessage();
        request.setMsgId(String.valueOf(msgId.incrementAndGet()));
        request.setMsgParams(message);
        request.setMsgType(MessageType.AvatarMQMessage);
        request.setMsgSource(MessageSource.AvatarMQProducer);
        message.setMsgId(request.getMsgId());

        ResponseMessage response = (ResponseMessage) sendAsynMessage(request);
        if (response == null) {
            ProducerAckMessage ack = new ProducerAckMessage();
            ack.setStatus(ProducerAckMessage.FAIL);
            return ack;
        }

        ProducerAckMessage result = (ProducerAckMessage) response.getMsgParams();
        return result;
    }
    
    //關閉消息生產者
    public void shutdown() {
        if (running) {
            running = false;
            super.getMessageConnectFactory().close();
            super.closeMessageConnectFactory();
        }
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getClusterId() {
        return clusterId;
    }

    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }
}

  

  Consumer Clusters Manage / Message Routing:消息的消費者集群管理以及消息路由模塊,其主要模塊在包(com.newlandframework.avatarmq.consumer)之中。其中消息消費者對象,對應的核心代碼主要功能描述如下:

package com.newlandframework.avatarmq.consumer;

import com.google.common.base.Joiner;
import com.newlandframework.avatarmq.core.AvatarMQAction;
import com.newlandframework.avatarmq.core.MessageIdGenerator;
import com.newlandframework.avatarmq.core.MessageSystemConfig;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.msg.SubscribeMessage;
import com.newlandframework.avatarmq.msg.UnSubscribeMessage;
import com.newlandframework.avatarmq.netty.MessageProcessor;

/**
 * @filename:AvatarMQConsumer.java
 * @description:AvatarMQConsumer功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class AvatarMQConsumer extends MessageProcessor implements AvatarMQAction {

    private ProducerMessageHook hook;
    private String brokerServerAddress;
    private String topic;
    private boolean subscribeMessage = false;
    private boolean running = false;
    private String defaultClusterId = "AvatarMQConsumerClusters";
    private String clusterId = "";
    private String consumerId = "";
    
    //連接的消息伺服器broker的ip地址以及關註的生產過來的消息鉤子
    public AvatarMQConsumer(String brokerServerAddress, String topic, ProducerMessageHook hook) {
        super(brokerServerAddress);
        this.hook = hook;
        this.brokerServerAddress = brokerServerAddress;
        this.topic = topic;
    }
    
    //向消息伺服器broker發送取消訂閱消息
    private void unRegister() {
        RequestMessage request = new RequestMessage();
        request.setMsgType(MessageType.AvatarMQUnsubscribe);
        request.setMsgId(new MessageIdGenerator().generate());
        request.setMsgParams(new UnSubscribeMessage(consumerId));
        sendSyncMessage(request);
        super.getMessageConnectFactory().close();
        super.closeMessageConnectFactory();
        running = false;
    }
    
    //向消息伺服器broker發送訂閱消息
    private void register() {
        RequestMessage request = new RequestMessage();
        request.setMsgType(MessageType.AvatarMQSubscribe);
        request.setMsgId(new MessageIdGenerator().generate());

        SubscribeMessage subscript = new SubscribeMessage();
        subscript.setClusterId((clusterId.equals("") ? defaultClusterId : clusterId));
        subscript.setTopic(topic);
        subscript.setConsumerId(consumerId);

        request.setMsgParams(subscript);

        sendAsynMessage(request);
    }
    
    public void init() {
        super.getMessageConnectFactory().setMessageHandle(new MessageConsumerHandler(this, new ConsumerHookMessageEvent(hook)));
        Joiner joiner = Joiner.on(MessageSystemConfig.MessageDelimiter).skipNulls();
        consumerId = joiner.join((clusterId.equals("") ? defaultClusterId : clusterId), topic, new MessageIdGenerator().generate());
    }
    
    //連接消息伺服器broker
    public void start() {
        if (isSubscribeMessage()) {
            super.getMessageConnectFactory().connect();
            register();
            running = true;
        }
    }

    public void receiveMode() {
        setSubscribeMessage(true);
    }

    public void shutdown() {
        if (running) {
            unRegister();
        }
    }

    public String getBrokerServerAddress() {
        return brokerServerAddress;
    }

    public void setBrokerServerAddress(String brokerServerAddress) {
        this.brokerServerAddress = brokerServerAddress;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public boolean isSubscribeMessage() {
        return subscribeMessage;
    }

    public void setSubscribeMessage(boolean subscribeMessage) {
        this.subscribeMessage = subscribeMessage;
    }

    public String getDefaultClusterId() {
        return defaultClusterId;
    }

    public void setDefaultClusterId(String defaultClusterId) {
        this.defaultClusterId = defaultClusterId;
    }

    public String getClusterId() {
        return clusterId;
    }

    public void setClusterId(String clusterId) {
        this.clusterId = clusterId;
    }
}

  消息的集群管理模塊,主要代碼是ConsumerContext.java、ConsumerClusters.java。先簡單說一下消費者集群模塊ConsumerClusters,主要負責定義消費者集群的行為,以及負責消息的路由。主要的功能描述如下所示:

package com.newlandframework.avatarmq.consumer;

import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.SubscriptionData;
import com.newlandframework.avatarmq.netty.NettyUtil;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections.Predicate;

/**
 * @filename:ConsumerClusters.java
 * @description:ConsumerClusters功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class ConsumerClusters {
    
    //輪詢調度(Round-Robin Scheduling)位置標記
    private int next = 0;
    private final String clustersId;
    private final ConcurrentHashMap<String/*生產者消息的主題*/, SubscriptionData/*消息對應的topic信息數據結構*/> subMap
            = new ConcurrentHashMap<String, SubscriptionData>();

    private final ConcurrentHashMap<String/*消費者標識編碼*/, RemoteChannelData/*對應的消費者的netty網路通信管道信息*/> channelMap
            = new ConcurrentHashMap<String, RemoteChannelData>();

    private final List<RemoteChannelData> channelList = Collections.synchronizedList(new ArrayList<RemoteChannelData>());

    public ConsumerClusters(String clustersId) {
        this.clustersId = clustersId;
    }

    public String getClustersId() {
        return clustersId;
    }

    public ConcurrentHashMap<String, SubscriptionData> getSubMap() {
        return subMap;
    }

    public ConcurrentHashMap<String, RemoteChannelData> getChannelMap() {
        return channelMap;
    }
    
    //添加一個消費者到消費者集群
    public void attachRemoteChannelData(String clientId, RemoteChannelData channelinfo) {
        if (findRemoteChannelData(channelinfo.getClientId()) == null) {
            channelMap.put(clientId, channelinfo);
            subMap.put(channelinfo.getSubcript().getTopic(), channelinfo.getSubcript());
            channelList.add(channelinfo);
        } else {
            System.out.println("consumer clusters exists! it's clientId:" + clientId);
        }
    }
    
    //從消費者集群中刪除一個消費者
    public void detachRemoteChannelData(String clientId) {
        channelMap.remove(clientId);

        Predicate predicate = new Predicate() {
            public boolean evaluate(Object object) {
                String id = ((RemoteChannelData) object).getClientId();
                return id.compareTo(clientId) == 0;
            }
        };

        RemoteChannelData data = (RemoteChannelData) CollectionUtils.find(channelList, predicate);
        if (data != null) {
            channelList.remove(data);
        }
    }
    
    //根據消費者標識編碼,在消費者集群中查找定位一個消費者,如果不存在返回null
    public RemoteChannelData findRemoteChannelData(String clientId) {
        return (RemoteChannelData) MapUtils.getObject(channelMap, clientId);
    }
    
    //負載均衡,根據連接到broker的順序,依次投遞消息給消費者。這裡的均衡演算法直接採用
    //輪詢調度(Round-Robin Scheduling),後續可以加入:加權輪詢、隨機輪詢、哈希輪詢等等策略。
    public RemoteChannelData nextRemoteChannelData() {

        Predicate predicate = new Predicate() {
            public boolean evaluate(Object object) {
                RemoteChannelData data = (RemoteChannelData) object;
                Channel channel = data.getChannel();
                return NettyUtil.validateChannel(channel);
            }
        };

        CollectionUtils.filter(channelList, predicate);
        return channelList.get(next++ % channelList.size());
    }
    
    //根據生產者的主題關鍵字,定位於具體的消息結構
    public SubscriptionData findSubscriptionData(String topic) {
        return this.subMap.get(topic);
    }
}

  而ConsumerContext主要的負責管理消費者集群的,其主要核心代碼註釋說明如下:

package com.newlandframework.avatarmq.consumer;

import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.SubscriptionData;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.iterators.FilterIterator;

/**
 * @filename:ConsumerContext.java
 * @description:ConsumerContext功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class ConsumerContext {
    //消費者集群關係定義
    private static final CopyOnWriteArrayList<ClustersRelation> relationArray = new CopyOnWriteArrayList<ClustersRelation>();
    //消費者集群狀態
    private static final CopyOnWriteArrayList<ClustersState> stateArray = new CopyOnWriteArrayList<ClustersState>();

    public static void setClustersStat(String clusters, int stat) {
        stateArray.add(new ClustersState(clusters, stat));
    }
    
    //根據消費者集群編碼cluster_id獲取一個消費者集群的狀態
    public static int getClustersStat(String clusters) {

        Predicate predicate = new Predicate() {
            public boolean evaluate(Object object) {
                String clustersId = ((ClustersState) object).getClusters();
                return clustersId.compareTo(clusters) == 0;
            }
        };

        Iterator iterator = new FilterIterator(stateArray.iterator(), predicate);

        ClustersState state = null;
        while (iterator.hasNext()) {
            state = (ClustersState) iterator.next();
            break;

        }
        return (state != null) ? state.getState() : 0;
    }
    
    //根據消費者集群編碼cluster_id查找一個消費者集群
    public static ConsumerClusters selectByClusters(String clusters) {
        Predicate predicate = new Predicate() {
            public boolean evaluate(Object object) {
                String id = ((ClustersRelation) object).getId();
                return id.compareTo(clusters) == 0;
            }
        };

        Iterator iterator = new FilterIterator(relationArray.iterator(), predicate);

        ClustersRelation relation = null;
        while (iterator.hasNext()) {
            relation = (ClustersRelation) iterator.next();
            break;
        }

        return (relation != null) ? relation.getClusters() : null;
    }
    
    //查找一下關註這個主題的消費者集群集合
    public static List<ConsumerClusters> selectByTopic(String topic) {

        List<ConsumerClusters> clusters = new ArrayList<ConsumerClusters>();

        for (int i = 0; i < relationArray.size(); i++) {
            ConcurrentHashMap<String, SubscriptionData> subscriptionTable = relationArray.get(i).getClusters().getSubMap();
            if (subscriptionTable.containsKey(topic)) {
                clusters.add(relationArray.get(i).getClusters());
            }
        }

        return clusters;
    }
    
    //添加消費者集群
    public static void addClusters(String clusters, RemoteChannelData channelinfo) {
        ConsumerClusters manage = selectByClusters(clusters);
        if (manage == null) {
            ConsumerClusters newClusters = new ConsumerClusters(clusters);
            newClusters.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
            relationArray.add(new ClustersRelation(clusters, newClusters));
        } else if (manage.findRemoteChannelData(channelinfo.getClientId()) != null) {
            manage.detachRemoteChannelData(channelinfo.getClientId());
            manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
        } else {
            String topic = channelinfo.getSubcript().getTopic();
            boolean touchChannel = manage.getSubMap().containsKey(topic);
            if (touchChannel) {
                manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
            } else {
                manage.getSubMap().clear();
                manage.getChannelMap().clear();
                manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
            }
        }
    }
    
    //從一個消費者集群中刪除一個消費者
    public static void unLoad(String clientId) {

        for (int i = 0; i < relationArray.size(); i++) {
            String id = relationArray.get(i).getId();
            ConsumerClusters manage = relationArray.get(i).getClusters();

            if (manage.findRemoteChannelData(clientId) != null) {
                manage.detachRemoteChannelData(clientId);
            }

            if (manage.getChannelMap().size() == 0) {
                ClustersRelation relation = new ClustersRelation();
                relation.setId(id);
                relationArray.remove(id);
            }
        }
    }
}

  

  ACK Queue Dispatch:主要是broker分別向對應的消息生產者、消費者發送ACK消息應答,其主要核心模塊是在:com.newlandframework.avatarmq.broker包下麵的AckPullMessageController和AckPushMessageController模塊,主要職責是在broker中收集生產者的消息,確認成功收到之後,把其放到消息隊列容器中,然後專門安排一個工作線程池把ACK應答發送給生產者。

 

  Message Queue Dispatch:生產者消息的分派,主要是由com.newlandframework.avatarmq.broker包下麵的SendMessageController派發模塊進行任務的分派,其中消息分派支持兩種策略,一種是記憶體緩衝消息區裡面只要一有消息就通知消費者;還有一種是對消息進行緩衝處理,累計到一定的數量之後進行派發,這個是根據:MessageSystemConfig類中的核心參數:SystemPropertySendMessageControllerTaskCommitValue(com.newlandframework.avatarmq.system.send.taskcommit)決定的,預設是1。即一有消息就派發,如果改成大於1的數值,表示消息緩衝的數量。現在給出SendMessageController的核心實現代碼:

package com.newlandframework.avatarmq.broker;

import com.newlandframework.avatarmq.core.SemaphoreCache;
import com.newlandframework.avatarmq.core.MessageSystemConfig;
import com.newlandframework.avatarmq.core.MessageTaskQueue;
import com.newlandframework.avatarmq.core.SendMessageCache;
import com.newlandframework.avatarmq.model.MessageDispatchTask;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * @filename:SendMessageController.java
 * @description:SendMessageController功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class SendMessageController implements Callable<Void> {

    private volatile boolean stoped = false;

    private AtomicBoolean flushTask = new AtomicBoolean(false);

    private ThreadLocal<ConcurrentLinkedQueue<MessageDispatchTask>> requestCacheList = new ThreadLocal<ConcurrentLinkedQueue<MessageDispatchTask>>() {
        protected ConcurrentLinkedQueue<MessageDispatchTask> initialValue() {
            return new ConcurrentLinkedQueue<MessageDispatchTask>();
        }
    };

    private final Timer timer = new Timer("SendMessageTaskMonitor", true);

    public void stop() {
        stoped = true;
    }

    public boolean isStoped() {
        return stoped;
    }

    public Void call() {
        int period = MessageSystemConfig.SendMessageControllerPeriodTimeValue;
        int commitNumber = MessageSystemConfig.SendMessageControllerTaskCommitValue;
        int sleepTime = MessageSystemConfig.SendMessageControllerTaskSleepTimeValue;

        ConcurrentLinkedQueue<MessageDispatchTask> queue = requestCacheList.get();
        SendMessageCache ref = SendMessageCache.getInstance();

        while (!stoped) {
            SemaphoreCache.acquire(MessageSystemConfig.NotifyTaskSemaphoreValue);
            MessageDispatchTask task = MessageTaskQueue.getInstance().getTask();

            queue.add(task);

            if (queue.size() == 0) {
                try {
                    Thread.sleep(sleepTime);
                    continue;
                } catch (InterruptedException ex) {
                    Logger.getLogger(SendMessageController.class.getName()).log(Level.SEVERE, null, ex);
                }
            }

            if (queue.size() > 0 && (queue.size() % commitNumber == 0 || flushTask.get() == true)) {
                ref.commit(queue);
                queue.clear();
                flushTask.compareAndSet(true, false);
            }

            timer.scheduleAtFixedRate(new TimerTask() {

                public void run() {
                    try {
                        flushTask.compareAndSet(false, true);
                    } catch (Exception e) {
                        System.out.println("SendMessageTaskMonitor happen exception");
                    }
                }
            }, 1000 * 1, period);
        }
        
        return null;
    }
}

  消息分派採用多線程並行派發,其內部通過柵欄機制,為消息派發設置一個屏障點,後續可以暴露給JMX介面,進行對整個消息系統,消息派發情況的動態監控。比如發現消息積壓太多,可以加大線程並行度。消息無堆積的話,降低線程並行度,減輕系統負荷。現在給出消息派發任務模塊SendMessageTask的核心代碼:

package com.newlandframework.avatarmq.core;

import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.broker.SendMessageLauncher;
import com.newlandframework.avatarmq.consumer.ClustersState;
import com.newlandframework.avatarmq.consumer.ConsumerContext;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.model.ResponseMessage;
import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.MessageSource;
import com.newlandframework.avatarmq.model.MessageDispatchTask;
import com.newlandframework.avatarmq.netty.NettyUtil;
import java.util.concurrent.Callable;
import java.util.concurrent.Phaser;

/**
 * @filename:SendMessageTask.java
 * @description:SendMessageTask功能模塊
 * @author tangjie<https://github.com/tang-jie>
 * @blog http://www.cnblogs.com/jietang/
 * @since 2016-8-11
 */
public class SendMessageTask implements Callable<Void> {

    private MessageDispatchTask[] tasks;
    //消息柵欄器,為後續進行消息JMX實時監控預留介面
    private Phaser phaser = null;
    private SendMessageLauncher launcher = SendMessageLauncher.getInstance();

    public SendMessageTask(Phaser phaser, MessageDispatchTask[] tasks) {
        this.phaser = phaser;
        this.tasks = tasks;
    }

    public Void call() throws Exception {
        for (MessageDispatchTask task : tasks) {
            Message msg = task.getMessage();

            if (ConsumerContext.selectByClusters(task.getClusters()) != null) {
                RemoteChannelData channel = ConsumerContext.selectByClusters(task.getClusters()).nextRemoteChannelData();

                ResponseMessage response = new ResponseMessage();
                response.setMsgSource(MessageSource.AvatarMQBroker);
                response.setMsgType(MessageType.AvatarMQMessage);
                response.set

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. 使用pymysql庫:import pymysql; 2. ...
  • 官方文檔:https://docs.python.org/3/library/exceptions.html 1. 使用try...except... 2. 輸出錯誤信息的方式為: 3. ...
  • 使用php和mysql開髮網站的話,phpmyadmin是一個非常友好的mysql管理工具,並且免費開源,國內很多虛擬主機都自帶這樣的管理工具,配置很簡單,接下來在linux伺服器上配置phpmyadmin來管理MySQL資料庫 首先訪問phpmyadmin官網首頁,網址為:http://www.p ...
  • Given an array and a value, remove all instances of that value in place and return the new length. Do not allocate extra space for another array, you ...
  • 在編譯FFmpeg的時候,用./configure 進行配置,經常會出現找不到庫文件的情況,原因大概就兩個: 1、沒有安裝庫文件或者安裝的庫文件版本不對 2、FFmpeg沒有找到庫文件 前者的問題好解決,只要安裝相應的庫就好了,但是安裝好相應的庫以後,一般還會掉入後者那個坑。 後者要解決也很簡單,只 ...
  • 在Java 8 之前,HashMap和其他基於map的類都是通過鏈地址法解決衝突,它們使用單向鏈表來存儲相同索引值的元素。在最壞的情況下,這種方式會將HashMap的get方法的性能從O(1)降低到O(n)。為瞭解決在頻繁衝突時hashmap性能降低的問題,Java 8中使用平衡樹來替代鏈表存儲衝突 ...
  • 這次說一說面向對象與面向過程的區別以及面向對象的優點。 聽一聽用面向過程思想編程的程式員寫程式時的心聲:What are the things this program has to do? What procedures do we need? 啊哈,作為一名合格的準java程式員的我的想法是:W ...
  • 線程池(Thread Pool)對於限制應用程式中同一時刻運行的線程數很有用。因為每啟動一個新線程都會有相應的性能開銷,每個線程都需要給棧分配一些記憶體等等。 我們可以把併發執行的任務傳遞給一個線程池,來替代為每個併發執行的任務都啟動一個新的線程。只要池裡有空閑的線程,任務就會分配給一個線程執行。線上 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...