大家好,我是三友~~ 這篇文章我準備來聊一聊RocketMQ消息的一生。 不知你是否跟我一樣,在使用RocketMQ的時候也有很多的疑惑: 消息是如何發送的,隊列是如何選擇的?消息是如何存儲的,是如何保證讀寫的高性能?RocketMQ是如何實現消息的快速查找的?RocketMQ是如何實現高可用的?消 ...
大家好,我是三友~~
這篇文章我準備來聊一聊RocketMQ消息的一生。
不知你是否跟我一樣,在使用RocketMQ的時候也有很多的疑惑:
消息是如何發送的,隊列是如何選擇的? 消息是如何存儲的,是如何保證讀寫的高性能? RocketMQ是如何實現消息的快速查找的? RocketMQ是如何實現高可用的? 消息是在什麼時候會被清除? ...
本文就通過探討上述問題來探秘消息在RocketMQ中短暫而又精彩的一生。
如果你還沒用過RocketMQ,可以看一下這篇文章 RocketMQ保姆級教程
核心概念
NameServer:可以理解為是一個註冊中心,主要是用來保存topic路由信息,管理Broker。在NameServer的集群中,NameServer與NameServer之間是沒有任何通信的。
Broker:核心的一個角色,主要是用來保存消息的,在啟動時會向NameServer進行註冊。Broker實例可以有很多個,相同的BrokerName可以稱為一個Broker組,每個Broker組只保存一部分消息。
topic:可以理解為一個消息的集合的名字,一個topic可以分佈在不同的Broker組下。
隊列(queue):一個topic可以有很多隊列,預設是一個topic在同一個Broker組中是4個。如果一個topic現在在2個Broker組中,那麼就有可能有8個隊列。
生產者:生產消息的一方就是生產者
生產者組:一個生產者組可以有很多生產者,只需要在創建生產者的時候指定生產者組,那麼這個生產者就在那個生產者組
消費者:用來消費生產者消息的一方
消費者組:跟生產者一樣,每個消費者都有所在的消費者組,一個消費者組可以有很多的消費者,不同的消費者組消費消息是互不影響的。
消息誕生與發送
我們都知道,消息是由業務系統在運行過程產生的,當我們的業務系統產生了消息,我們就可以調用RocketMQ提供的API向RocketMQ發送消息,就像下麵這樣
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
//指定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
//啟動生產者
producer.start();
//省略代碼。。
Message msg = new Message("sanyouTopic", "TagA", "三友的java日記 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送消息並得到消息的發送結果,然後列印
SendResult sendResult = producer.send(msg);
雖然代碼很簡單,我們不經意間可能會思考如下問題:
代碼中只設置了NameServer的地址,那麼生產者是如何知道Broker所在機器的地址,然後向Broker發送消息的? 一個topic會有很多隊列,那麼生產者是如何選擇哪個隊列發送消息? 消息一旦發送失敗了怎麼辦?
路由表
當Broker在啟動的過程中,Broker就會往NameServer註冊自己這個Broker的信息,這些信息就包括自身所在伺服器的ip和埠,還有就是自己這個Broker有哪些topic和對應的隊列信息,這些信息就是路由信息,後面就統一稱為路由表。
當生產者啟動的時候,會從NameServer中拉取到路由表,緩存到本地,同時會開啟一個定時任務,預設是每隔30s從NameServer中重新拉取路由信息,更新本地緩存。
隊列的選擇
好了通過上一節我們就明白了,原來生產者會從NameServer拉取到Broker的路由表的信息,這樣生產者就知道了topic對應的隊列的信息了。
但是由於一個topic可能會有很多的隊列,那麼應該將消息發送到哪個隊列上呢?
面對這種情況,RocketMQ提供了兩種消息隊列的選擇演算法。
輪詢演算法 最小投遞延遲演算法
輪詢演算法 就是一個隊列一個隊列發送消息,這些就能保證消息能夠均勻分佈在不同的隊列底下,這也是RocketMQ預設的隊列選擇演算法。
但是由於機器性能或者其它情況可能會出現某些Broker上的Queue可能投遞延遲較嚴重,這樣就會導致生產者不能及時發消息,造成生產者壓力過大的問題。所以RocketMQ提供了最小投遞延遲演算法。
最小投遞延遲演算法 每次消息投遞的時候會統計投遞的時間延遲,在選擇隊列的時候會優先選擇投遞延遲時間小的隊列。這種演算法可能會導致消息分佈不均勻的問題。
如果你想啟用最小投遞延遲演算法,只需要按如下方法設置一下即可。
producer.setSendLatencyFaultEnable(true);
當然除了上述兩種隊列選擇演算法之外,你也可以自定義隊列選擇演算法,只需要實現MessageQueueSelector介面,在發送消息的時候指定即可。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//從mqs中選擇一個隊列
return null;
}
}, new Object());
MessageQueueSelector RocketMQ也提供了三種實現
隨機演算法 Hash演算法 根據機房選擇演算法(空實現)
其它特殊情況處理
發送異常處理
終於,不論是通過RocketMQ預設的隊列選擇演算法也好,又或是自定義隊列選擇演算法也罷,終於選擇到了一個隊列,那麼此時就可以跟這個隊列所在的Broker機器建立網路連接,然後通過網路請求將消息發送到Broker上。
但是不幸的事發生了,Broker掛了,又或者是機器負載太高了,發送消息超時了,那麼此時RockerMQ就會進行重試。
RockerMQ重試其實很簡單,就是重新選擇其它Broker機器中的一個隊列進行消息發送,預設會重試兩次。
當然如果你的機器比較多,可以將設置重試次數設置大點。
producer.setRetryTimesWhenSendFailed(10);
消息過大的處理
一般情況下,消息的內容都不會太大,但是在一些特殊的場景中,消息內容可能會出現很大的情況。
遇到這種消息過大的情況,比如在預設情況下消息大小超過4M的時候,RocketMQ是會對消息進行壓縮之後再發送到Broker上,這樣在消息發送的時候就可以減少網路資源的占用。
消息存儲
好了,經過以上環節Broker終於成功接收到了生產者發送的消息了,但是為了能夠保證Broker重啟之後消息也不丟失,此時就需要將消息持久化到磁碟。
如何保證高性能讀寫
由於涉及到消息持久化操作,就涉及到磁碟數據的讀寫操作,那麼如何實現文件的高性能讀寫呢?這裡就不得不提到的一個叫零拷貝的技術。
傳統IO讀寫方式
說零拷貝之前,先說一下傳統的IO讀寫方式。
比如現在需要將磁碟文件通過網路傳輸出去,那麼整個傳統的IO讀寫模型如下圖所示
傳統的IO讀寫其實就是read + write的操作,整個過程會分為如下幾步
用戶調用read()方法,開始讀取數據,此時發生一次上下文從用戶態到內核態的切換,也就是圖示的切換1 將磁碟數據通過DMA拷貝到內核緩存區 將內核緩存區的數據拷貝到用戶緩衝區,這樣用戶,也就是我們寫的代碼就能拿到文件的數據 read()方法返回,此時就會從內核態切換到用戶態,也就是圖示的切換2 當我們拿到數據之後,就可以調用write()方法,此時上下文會從用戶態切換到內核態,即圖示切換3 CPU將用戶緩衝區的數據拷貝到Socket緩衝區 將Socket緩衝區數據拷貝至網卡 write()方法返回,上下文重新從內核態切換到用戶態,即圖示切換4
整個過程發生了4次上下文切換和4次數據的拷貝,這在高併發場景下肯定會嚴重影響讀寫性能。
所以為了減少上下文切換次數和數據拷貝次數,就引入了零拷貝技術。
零拷貝
零拷貝技術是一個思想,指的是指電腦執行操作時,CPU不需要先將數據從某處記憶體複製到另一個特定區域。
實現零拷貝的有以下幾種方式
mmap() sendfile()
mmap()
mmap(memory map)是一種記憶體映射文件的方法,即將一個文件或者其它對象映射到進程的地址空間,實現文件磁碟地址和進程虛擬地址空間中一段虛擬地址的一一對映關係。
簡單地說就是內核緩衝區和應用緩衝區共用,從而減少了從讀緩衝區到用戶緩衝區的一次CPU拷貝。
比如基於mmap,上述的IO讀寫模型就可以變成這樣。
基於mmap IO讀寫其實就變成mmap + write的操作,也就是用mmap替代傳統IO中的read操作。
當用戶發起mmap調用的時候會發生上下文切換1,進行記憶體映射,然後數據被拷貝到內核緩衝區,mmap返回,發生上下文切換2;隨後用戶調用write,發生上下文切換3,將內核緩衝區的數據拷貝到Socket緩衝區,write返回,發生上下文切換4。
整個過程相比於傳統IO主要是不用將內核緩衝區的數據拷貝到用戶緩衝區,而是直接將數據拷貝到Socket緩衝區。上下文切換的次數仍然是4次,但是拷貝次數只有3次,少了一次CPU拷貝。
在Java中,提供了相應的api可以實現mmap,當然底層也還是調用Linux系統的mmap()實現的
FileChannel fileChannel = new RandomAccessFile("test.txt", "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
如上代碼拿到MappedByteBuffer,之後就可以基於MappedByteBuffer去讀寫。
sendfile()
sendfile()跟mmap()一樣,也會減少一次CPU拷貝,但是它同時也會減少兩次上下文切換。
如圖,用戶發起sendfile()調用時會發生切換1,之後數據通過DMA拷貝到內核緩衝區,之後再將內核緩衝區的數據CPU拷貝到Socket緩衝區,最後拷貝到網卡,sendfile()返回,發生切換2。
同樣地,Java也提供了相應的api,底層還是操作系統的sendfile()
FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//調用transferTo方法向目標數據傳輸
channel.transferTo(position, len, target);
通過FileChannel的transferTo方法即可實現。
transferTo方法(sendfile)主要是用於文件傳輸,比如將文件傳輸到另一個文件,又或者是網路。
在如上代碼中,並沒有文件的讀寫操作,而是直接將文件的數據傳輸到target目標緩衝區,也就是說,sendfile是無法知道文件的具體的數據的;但是mmap不一樣,他是可以修改內核緩衝區的數據的。假設如果需要對文件的內容進行修改之後再傳輸,只有mmap可以滿足。
通過上面的一些介紹,主要就是一個結論,那就是基於零拷貝技術,可以減少CPU的拷貝次數和上下文切換次數,從而可以實現文件高效的讀寫操作。
RocketMQ內部主要是使用基於mmap實現的零拷貝(其實就是調用上述提到的api),用來讀寫文件,這也是RocketMQ為什麼快的一個很重要原因。
CommitLog
前面提到消息需要持久化到磁碟文件中,而CommitLog其實就是存儲消息的文件的一個稱呼,所有的消息都存在CommitLog中,一個Broker實例只有一個CommitLog。
由於消息數據可能會很大,同時兼顧記憶體映射的效率,不可能將所有消息都寫到同一個文件中,所以CommitLog在物理磁碟文件上被分為多個磁碟文件,每個文件預設的固定大小是1G。
當生產者將消息發送過來的時候,就會將消息按照順序寫到文件中,當文件空間不足時,就會重新建一個新的文件,消息寫到新的文件中。
消息在寫入到文件時,不僅僅會包含消息本身的數據,也會包含其它的對消息進行描述的數據,比如這個消息來自哪台機器、消息是哪個topic的、消息的長度等等,這些數據會和消息本身按照一定的順序同時寫到文件中,所以圖示的消息其實是包含消息的描述信息的。
刷盤機制
RocketMQ在將消息寫到CommitLog文件中時並不是直接就寫到文件中,而是先寫到PageCache,也就是前面說的內核緩存區,所以RocketMQ提供了兩種刷盤機制,來將內核緩存區的數據刷到磁碟。
非同步刷盤
非同步刷盤就是指Broker將消息寫到PageCache的時候,就直接返回給生產者說消息存儲成功了,然後通過另一個後臺線程來將消息刷到磁碟,這個後臺線程是在RokcetMQ啟動的時候就會開啟。非同步刷盤方式也是RocketMQ預設的刷盤方式。
其實RocketMQ的非同步刷盤也有兩種不同的方式,一種是固定時間,預設是每隔0.5s就會刷一次盤;另一種就是頻率會快點,就是每存一次消息就會通知去刷盤,但不會去等待刷盤的結果,同時如果0.5s內沒被通知去刷盤,也會主動去刷一次盤。預設的是第一種固定時間的方式。
同步刷盤
同步刷盤就是指Broker將消息寫到PageCache的時候,會等待非同步線程將消息成功刷到磁碟之後再返回給生產者說消息存儲成功。
同步刷盤相對於非同步刷盤來說消息的可靠性更高,因為非同步刷盤可能出現消息並沒有成功刷到磁碟時,機器就宕機的情況,此時消息就丟了;但是同步刷盤需要等待消息刷到磁碟,那麼相比非同步刷盤吞吐量會降低。所以同步刷盤適合那種對數據可靠性要求高的場景。
如果你需要使用同步刷盤機制,只需要在配置文件指定一下刷盤機制即可。
高可用
在說高可用之前,先來完善一下前面的一些概念。
在前面介紹概念的時候也說過,一個RokcetMQ中可以有很多個Broker實例,相同的BrokerName稱為一個組,同一個Broker組下每個Broker實例保存的消息是一樣的,不同的Broker組保存的消息是不一樣的。
如圖所示,兩個BrokerA實例組成了一個Broker組,兩個BrokerB實例也組成了一個Broker組。
前面說過,每個Broker實例都有一個CommitLog文件來存儲消息的。那麼兩個BrokerA實例他們CommitLog文件存儲的消息是一樣的,兩個BrokerB實例他們CommitLog文件存儲的消息也是一樣的。
那麼BrokerA和BrokerB存的消息不一樣是什麼意思呢?
其實很容易理解,假設現在有個topicA存在BrokerA和BrokerB上,那麼topicA在BrokerA和BrokerB預設都會有4個隊列。
前面在說發消息的時候需要選擇一個隊列進行消息的發送,假設第一次選擇了BrokerA上的隊列發送消息,那麼此時這條消息就存在BrokerA上,假設第二次選擇了BrokerB上的隊列發送消息,那麼那麼此時這條消息就存在BrokerB上,所以說BrokerA和BrokerB存的消息是不一樣的。
那麼為什麼同一個Broker組內的Broker存儲的消息是一樣的呢?其實比較容易猜到,就是為了保證Broker的高可用,這樣就算Broker組中的某個Broker掛了,這個Broker組依然可以對外提供服務。
那麼如何實現同Broker組的Broker存的消息數據相同的呢?這就不得不提到Broker的高可用模式。
RocketMQ提供了兩種Broker的高可用模式
主從同步模式 Dledger模式
主從同步模式
在主從同步模式下,在啟動的時候需要在配置文件中指定BrokerId,在同一個Broker組中,BrokerId為0的是主節點(master),其餘為從節點(slave)。
當生產者將消息寫入到主節點是,主節點會將消息內容同步到從節點機器上,這樣一旦主節點宕機,從節點機器依然可以提供服務。
主從同步主要同步兩部分數據
topic等數據 消息
topic等數據是從節點每隔10s鐘主動去主節點拉取,然後更新本身緩存的數據。
消息是主節點主動推送到從節點的。當主節點收到消息之後,會將消息通過兩者之間建立的網路連接發送出去,從節點接收到消息之後,寫到CommitLog即可。
從節點有兩種方式知道主節點所在伺服器的地址,第一種就是在配置文件指定;第二種就是從節點在註冊到NameServer的時候會返回主節點的地址。
主從同步模式有一個比較嚴重的問題就是如果集群中的主節點掛了,這時需要人為進行干預,手動進行重啟或者切換操作,而非集群自己從從節點中選擇一個節點升級為主節點。
為瞭解決上述的問題,所以RocketMQ在4.5.0就引入了Dledger模式。
Dledger模式
在Dledger模式下的集群會基於Raft協議選出一個節點作為leader節點,當leader節點掛了後,會從follower中自動選出一個節點升級成為leader節點。所以Dledger模式解決了主從模式下無法自動選擇主節點的問題。
在Dledger集群中,leader節點負責寫入消息,當消息寫入leader節點之後,leader會將消息同步到follower節點,當集群中過半數(節點數/2 +1)節點都成功寫入了消息,這條消息才算真正寫成功。
至於選舉的細節,這裡就不多說了,有興趣的可以自行谷歌,還是挺有意思的。
消息消費
終於,在生產者成功發送消息到Broker,Broker在成功存儲消息之後,消費者要消費消息了。
消費者在啟動的時候會從NameSrever拉取消費者訂閱的topic的路由信息,這樣就知道訂閱的topic有哪些queue,以及queue所在Broker的地址信息。
為什麼消費者需要知道topic對應的哪些queue呢?
其實主要是因為消費者在消費消息的時候是以隊列為消費單元的,消費者需要告訴Broker拉取的是哪個隊列的消息,至於如何拉到消息的,後面再說。
消費的兩種模式
前面說過,消費者是有個消費者組的概念,在啟動消費者的時候會指定該消費者屬於哪個消費者組。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
一個消費者組中可以有多個消費者,不同消費者組之間消費消息是互不幹擾的。
在同一個消費者組中,消息消費有兩種模式。
集群模式 廣播模式
集群模式
同一條消息只能被同一個消費組下的一個消費者消費,也就是說,同一條消息在同一個消費者組底下只會被消費一次,這就叫集群消費。
集群消費的實現就是將隊列按照一定的演算法分配給消費者,預設是按照平均分配的。
如圖所示,將每個隊列分配只分配給同一個消費者組中的一個消費者,這樣消息就只會被一個消費者消費,從而實現了集群消費的效果。
RocketMQ預設是集群消費的模式。
廣播模式
廣播模式就是同一條消息可以被同一個消費者組下的所有消費者消費。
其實實現也很簡單,就是將所有隊列分配給每個消費者,這樣每個消費者都能讀取topic底下所有的隊列的數據,就實現了廣播模式。
如果你想使用廣播模式,只需要在代碼中指定即可。
consumer.setMessageModel(MessageModel.BROADCASTING);
ConsumeQueue
上一節我們提到消費者是從隊列中拉取消息的,但是這裡不經就有一個疑問,那就是