每日一句 軍人天生就捨棄了戰鬥的意義! 概述 RabitMQ 發佈確認,保證消息在磁碟上。 前提條件 1。隊列必須持久化 隊列持久化 2。隊列中的消息必須持久化 消息持久化 使用 三種發佈確認的方式: 1。單個發佈確認 2。批量發佈確認 3。非同步批量發佈確認 開啟發佈確認的方法 //創建一個連接工廠 ...
每日一句
軍人天生就捨棄了戰鬥的意義!
概述
RabitMQ 發佈確認,保證消息在磁碟上。
前提條件
1。隊列必須持久化 隊列持久化
2。隊列中的消息必須持久化 消息持久化
使用
三種發佈確認的方式:
1。單個發佈確認
2。批量發佈確認
3。非同步批量發佈確認
開啟發佈確認的方法
//創建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
**
//開啟發佈確認
channel.confirmSelect();**
單個確認
最簡單的確認方式,它是一種同步發佈確認的方式,也就是說發送一個消息後只有它被確認,後續的消息才能繼續發佈。
最大缺點是:發佈速度特別的滿。
吞吐量:每秒不超過數百條發佈的消息
/**
* 單個確認
*/
public static void publishSingleMessage() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//生命隊列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
**//開啟發佈確認
channel.confirmSelect();**
//開始時間
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//單個消息馬上進行確認
** boolean b = channel.waitForConfirms();**
if (b) {
System.out.println("消息發送成功!!!");
}
}
//結束時間
long end = System.currentTimeMillis();
System.out.println("發送消息1000,單個發佈確認用時: " + (end - begin) + " ms");
}
批量確認
與單個等待確認消息相比,先發佈一批消息然後一起確認可以極大地提高吞吐量。
當然這種方式的缺點就是:當發生故障導致發佈出現問題時,不知道是哪個消息出現問題了,我們必須將整個批處理保存在記憶體中,以記錄重要的信息而後重新發佈消息。
當然這種方案仍然是同步的,也一樣阻塞消息的發佈
/**
* 批量確認
*/
public static void publishBatchMessage() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//生命隊列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
**//開啟發佈確認
channel.confirmSelect();
//批量確認消息大小
int batchSize = 100;
//未確認消息個數
int outstandingMessageCount = 0;**
//開始時間
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
**outstandingMessageCount++;
//發送的消息 == 確認消息的大小後才批量確認
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}**
}
**//為了確保還有剩餘沒有確認消息 再次確認
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}**
//結束時間
long end = System.currentTimeMillis();
System.out.println("發送消息1000,批量發佈確認100個用時: " + (end - begin) + " ms");
}
非同步確認
它是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功
/**
* 非同步批量確認
*
* @throws Exception
*/
public static void publishAsyncMessage() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
** //開啟發佈確認
channel.confirmSelect();
**
//線程安全有序的一個哈希表,適用於高併發的情況
//1.輕鬆的將序號與消息進行關聯 2.輕鬆批量刪除條目 只要給到序列號 3.支持併發訪問
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
**//確認收到消息的一個回調**
//1.消息序列號
//2.multiple 是否是批量確認
//false 確認當前序列號消息
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小於等於當前序列號的未確認消息 是一個 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認消息
confirmed.clear();
} else {
//只清除當前序列號的消息
outstandingConfirms.remove(sequenceNumber);
}
};
//未確認消息的回調
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發佈的消息" + message + "未被確認,序列號" + sequenceNumber);
};
**//添加一個非同步確認的監聽器
//1.確認收到消息的回調
//2.未收到消息的回調
channel.addConfirmListener(ackCallback, nackCallback);**
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = "消息" + i;
**//channel.getNextPublishSeqNo()獲取下一個消息的序列號
//通過序列號與消息體進行一個關聯,全部都是未確認的消息體
//將發佈的序號和發佈消息保存到map中
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);**
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發佈" + 1000 + "個非同步確認消息,耗時" + (end - begin) + "ms");
}
}
如何處理非同步未確認消息
最好的解決的解決方案就是把未確認的消息放到一個基於記憶體的能被髮佈線程訪問,適用於高併發的的隊列。
比如說用 ConcurrentLinkedQueue 、這個隊列在 confirm callbacks 與發佈線程之間進行消息的傳遞。
ConcurrentSkipListMap
等等都可。
面試題
如何保證消息不丟失?
就市面上常見的消息隊列而言,只要配置得當,我們的消息就不會丟失。
消息隊列主要有三個階段:
1。生產消息
2。存儲消息
3。消費消息
1。生產消息
生產者發送消息至 Broker ,需要處理 Broker 的響應,不論是同步還是非同步發送消息,同步和非同步回調都需要做好 try-catch ,妥善的處理響應。
如果 Broker 返回寫入失敗等錯誤消息,需要重試發送。
當多次發送失敗需要作報警,日誌記錄等。這樣就能保證在生產消息階段消息不會丟失。
2。存儲消息
存儲消息階段需要在消息刷盤之後再給生產者響應,假設消息寫入緩存中就返迴響應,那麼機器突然斷電這消息就沒了,而生產者以為已經發送成功了。
如果 Broker 是集群部署,有多副本機制,即消息不僅僅要寫入當前 Broker ,還需要寫入副本機中。
那配置成至少寫入兩台機子後再給生產者響應。這樣基本上就能保證存儲的可靠了。一臺掛了還有一臺還
在呢(假如怕兩台都掛了..那就再多些)。
3。消費消息
我們應該在消費者真正執行完業務邏輯之後,再發送給 Broker 消費成功,這才是真正的消費了。
所以只要我們在消息業務邏輯處理完成之後再給 Broker 響應,那麼消費階段消息就不會丟失
總結:
1。生產者 需要處理好 Broker 的響應,出錯情況下利用重試、報警等手段
2。Broker 需要控制響應的時機,單機情況下是消息刷盤後返迴響應,集群多副本情況下,即發送至兩個副本及以上的情況下再返迴響應。
3。消費者 需要在執行完真正的業務邏輯之後再返迴響應給 Broker
volatile 關鍵字的作用?
1。保證記憶體可見性
1.1 基本概念
可見性 是指線程之間的可見性,一個線程修改的狀態對另一個線程是可見的。也就是一個線程修改的結果,另一個線程馬上就能夠看到。
1.2 實現原理
當對非volatile變數進行讀寫的時候,每個線程先從主記憶體拷貝變數到CPU緩存中,如果電腦有多個CPU,每個線程可能在不同的CPU上被處理,這意味著每個線程可以拷貝到不同的CPU cache中。volatile變數不會被緩存在寄存器或者對其他處理器不可見的地方,保證了每次讀寫變數都從主記憶體中讀,跳過CPU cache這一步。當一個線程修改了這個變數的值,新值對於其他線程是立即得知的。
2。禁止指令重排序
2.1 基本概念
指令重排序是JVM為了優化指令、提高程式運行效率,在不影響單線程程式執行結果的前提下,儘可能地提高並行度。指令重排序包括編譯器重排序和運行時重排序。在JDK1.5之後,可以使用volatile變數禁止指令重排序。針對volatile修飾的變數,在讀寫操作指令前後會插入記憶體屏障,指令重排序時不能把後面的指令重排序到記憶體屏
示例說明:
double r = 2.1; //(1)
double pi = 3.14;//(2)
double area = pi*r*r;//(3)
雖然代碼語句的定義順序為1->2->3,但是計算順序1->2->3與2->1->3對結果並無影響,所以編譯時和運行時可以根據需要對1、2語句進行重排序。
2.2 指令重排帶來的問題
線程A中
{
context = loadContext();
inited = true;
}
線程B中
{
if (inited)
fun(context);
}
如果線程A中的指令發生了重排序,那麼B中很可能就會拿到一個尚未初始化或尚未初始化完成的context,從而引發程式錯誤。
2.3 禁止指令重排的原理
olatile關鍵字提供記憶體屏障的方式來防止指令被重排,編譯器在生成位元組碼文件時,會在指令序列中插入記憶體屏障來禁止特定類型的處理器重排序。
JVM記憶體屏障插入策略:
- 每個volatile寫操作的前面插入一個StoreStore屏障;
- 在每個volatile寫操作的後面插入一個StoreLoad屏障;
- 在每個volatile讀操作的後面插入一個LoadLoad屏障;
- 在每個volatile讀操作的後面插入一個LoadStore屏障。
3。適用場景
(1)volatile關鍵字無法同時保證記憶體可見性和原子性。加鎖機制既可以確保可見性也可以確保原子性。
(2)volatile屏蔽掉了JVM中必要的代碼優化,所以在效率上比較低,因此一定在必要時才使用此關鍵字。
介紹一下Netty?
-
Netty是一個高性能、非同步事件驅動的NIO框架。
-
簡化並優化了TCP和UDP套接字等網路編程,性能和安全等很多方面都做了優化。
3.支持多種協議,如FTP、SMTP、HTTP以及各種二進位和基於文本的傳統協議。
在網路編程中,Netty是絕對的王者。
有很多開源項目都用到了Netty。
1。市面上很多消息推送系統都是基於Netty來做的。
2。我們常用的框架:Dubbo、RocketMQ、ES等等都用到了Netty。
使用Netty的項目統計:https://netty.io/wiki/related-projects.html
你好,我是yltrcc,日常分享技術點滴,歡迎關註我:ylcoder
本文由博客一文多發平臺 OpenWrite 發佈!