RabitMQ 發佈確認

来源:https://www.cnblogs.com/yltrcc/archive/2022/05/24/16305880.html
-Advertisement-
Play Games

每日一句 軍人天生就捨棄了戰鬥的意義! 概述 RabitMQ 發佈確認,保證消息在磁碟上。 前提條件 1。隊列必須持久化 隊列持久化 2。隊列中的消息必須持久化 消息持久化 使用 三種發佈確認的方式: 1。單個發佈確認 2。批量發佈確認 3。非同步批量發佈確認 開啟發佈確認的方法 //創建一個連接工廠 ...


每日一句

軍人天生就捨棄了戰鬥的意義!

概述

RabitMQ 發佈確認,保證消息在磁碟上。

前提條件

1。隊列必須持久化 隊列持久化

2。隊列中的消息必須持久化 消息持久化

使用

三種發佈確認的方式:

1。單個發佈確認

2。批量發佈確認

3。非同步批量發佈確認

開啟發佈確認的方法

 //創建一個連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
**
 //開啟發佈確認
channel.confirmSelect();**

單個確認

最簡單的確認方式,它是一種同步發佈確認的方式,也就是說發送一個消息後只有它被確認,後續的消息才能繼續發佈。

最大缺點是:發佈速度特別的滿。

吞吐量:每秒不超過數百條發佈的消息

/**
 * 單個確認
 */
public static void publishSingleMessage() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //生命隊列
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true, false, false, null);
    **//開啟發佈確認
    channel.confirmSelect();**
    //開始時間
    long begin = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //單個消息馬上進行確認
       ** boolean b = channel.waitForConfirms();**
        if (b) {
            System.out.println("消息發送成功!!!");
        }
    }

    //結束時間
    long end = System.currentTimeMillis();

    System.out.println("發送消息1000,單個發佈確認用時: " + (end - begin) + " ms");
}

批量確認

與單個等待確認消息相比,先發佈一批消息然後一起確認可以極大地提高吞吐量。

當然這種方式的缺點就是:當發生故障導致發佈出現問題時,不知道是哪個消息出現問題了,我們必須將整個批處理保存在記憶體中,以記錄重要的信息而後重新發佈消息。

當然這種方案仍然是同步的,也一樣阻塞消息的發佈

/**
 * 批量確認
 */
public static void publishBatchMessage() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //生命隊列
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true, false, false, null);
    **//開啟發佈確認
    channel.confirmSelect();
    //批量確認消息大小
    int batchSize = 100;
    //未確認消息個數
    int outstandingMessageCount = 0;**

    //開始時間
    long begin = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        **outstandingMessageCount++;
        //發送的消息 == 確認消息的大小後才批量確認
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }**
    }
    **//為了確保還有剩餘沒有確認消息 再次確認
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }**
    //結束時間
    long end = System.currentTimeMillis();

    System.out.println("發送消息1000,批量發佈確認100個用時: " + (end - begin) + " ms");
}

非同步確認

它是利用回調函數來達到消息可靠性傳遞的,這個中間件也是通過函數回調來保證是否投遞成功

/**
 * 非同步批量確認
 *
 * @throws Exception
 */
public static void publishAsyncMessage() throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
       ** //開啟發佈確認
        channel.confirmSelect();
**
        //線程安全有序的一個哈希表,適用於高併發的情況
        //1.輕鬆的將序號與消息進行關聯 2.輕鬆批量刪除條目 只要給到序列號 3.支持併發訪問
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        **//確認收到消息的一個回調**
        //1.消息序列號
        //2.multiple  是否是批量確認
        //false 確認當前序列號消息
        ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
            if (multiple) {
                //返回的是小於等於當前序列號的未確認消息 是一個 map
                ConcurrentNavigableMap<Long, String> confirmed =
                        outstandingConfirms.headMap(sequenceNumber, true);
                //清除該部分未確認消息
                confirmed.clear();
            } else {
                //只清除當前序列號的消息
                outstandingConfirms.remove(sequenceNumber);
            }
        };
        //未確認消息的回調
        ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
            String message = outstandingConfirms.get(sequenceNumber);
            System.out.println("發佈的消息" + message + "未被確認,序列號" + sequenceNumber);
        };

        **//添加一個非同步確認的監聽器
        //1.確認收到消息的回調
        //2.未收到消息的回調
        channel.addConfirmListener(ackCallback, nackCallback);**

        long begin = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            String message = "消息" + i;
            **//channel.getNextPublishSeqNo()獲取下一個消息的序列號
            //通過序列號與消息體進行一個關聯,全部都是未確認的消息體
            //將發佈的序號和發佈消息保存到map中
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);**
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        long end = System.currentTimeMillis();
        System.out.println("發佈" + 1000 + "個非同步確認消息,耗時" + (end - begin) + "ms");
    }

}

如何處理非同步未確認消息

最好的解決的解決方案就是把未確認的消息放到一個基於記憶體的能被髮佈線程訪問,適用於高併發的的隊列。
比如說用 ConcurrentLinkedQueue 、這個隊列在 confirm callbacks 與發佈線程之間進行消息的傳遞。

ConcurrentSkipListMap

等等都可。

面試題

如何保證消息不丟失?

就市面上常見的消息隊列而言,只要配置得當,我們的消息就不會丟失。

消息隊列主要有三個階段:

1。生產消息

2。存儲消息

3。消費消息

1。生產消息

生產者發送消息至 Broker ,需要處理 Broker 的響應,不論是同步還是非同步發送消息,同步和非同步回調都需要做好 try-catch ,妥善的處理響應。

如果 Broker 返回寫入失敗等錯誤消息,需要重試發送。

當多次發送失敗需要作報警,日誌記錄等。這樣就能保證在生產消息階段消息不會丟失。

2。存儲消息

存儲消息階段需要在消息刷盤之後再給生產者響應,假設消息寫入緩存中就返迴響應,那麼機器突然斷電這消息就沒了,而生產者以為已經發送成功了。

如果 Broker 是集群部署,有多副本機制,即消息不僅僅要寫入當前 Broker ,還需要寫入副本機中。

那配置成至少寫入兩台機子後再給生產者響應。這樣基本上就能保證存儲的可靠了。一臺掛了還有一臺還

在呢(假如怕兩台都掛了..那就再多些)。

3。消費消息

我們應該在消費者真正執行完業務邏輯之後,再發送給 Broker 消費成功,這才是真正的消費了。

所以只要我們在消息業務邏輯處理完成之後再給 Broker 響應,那麼消費階段消息就不會丟失

總結:

1。生產者 需要處理好 Broker 的響應,出錯情況下利用重試、報警等手段

2。Broker 需要控制響應的時機,單機情況下是消息刷盤後返迴響應,集群多副本情況下,即發送至兩個副本及以上的情況下再返迴響應。

3。消費者 需要在執行完真正的業務邏輯之後再返迴響應給 Broker

volatile 關鍵字的作用?

1。保證記憶體可見性

1.1 基本概念

可見性 是指線程之間的可見性,一個線程修改的狀態對另一個線程是可見的。也就是一個線程修改的結果,另一個線程馬上就能夠看到。

1.2 實現原理

當對非volatile變數進行讀寫的時候,每個線程先從主記憶體拷貝變數到CPU緩存中,如果電腦有多個CPU,每個線程可能在不同的CPU上被處理,這意味著每個線程可以拷貝到不同的CPU cache中。volatile變數不會被緩存在寄存器或者對其他處理器不可見的地方,保證了每次讀寫變數都從主記憶體中讀,跳過CPU cache這一步。當一個線程修改了這個變數的值,新值對於其他線程是立即得知的。

2。禁止指令重排序

2.1 基本概念

指令重排序是JVM為了優化指令、提高程式運行效率,在不影響單線程程式執行結果的前提下,儘可能地提高並行度。指令重排序包括編譯器重排序和運行時重排序。在JDK1.5之後,可以使用volatile變數禁止指令重排序。針對volatile修飾的變數,在讀寫操作指令前後會插入記憶體屏障,指令重排序時不能把後面的指令重排序到記憶體屏

示例說明:
double r = 2.1; //(1) 
double pi = 3.14;//(2) 
double area = pi*r*r;//(3)
雖然代碼語句的定義順序為1->2->3,但是計算順序1->2->3與2->1->3對結果並無影響,所以編譯時和運行時可以根據需要對1、2語句進行重排序。

2.2 指令重排帶來的問題

線程A中
{
    context = loadContext();
    inited = true;
}

線程B中
{
    if (inited) 
        fun(context);
}
如果線程A中的指令發生了重排序,那麼B中很可能就會拿到一個尚未初始化或尚未初始化完成的context,從而引發程式錯誤。

2.3 禁止指令重排的原理

olatile關鍵字提供記憶體屏障的方式來防止指令被重排,編譯器在生成位元組碼文件時,會在指令序列中插入記憶體屏障來禁止特定類型的處理器重排序。

JVM記憶體屏障插入策略:

  • 每個volatile寫操作的前面插入一個StoreStore屏障;
  • 在每個volatile寫操作的後面插入一個StoreLoad屏障;
  • 在每個volatile讀操作的後面插入一個LoadLoad屏障;
  • 在每個volatile讀操作的後面插入一個LoadStore屏障。

3。適用場景

(1)volatile關鍵字無法同時保證記憶體可見性和原子性。加鎖機制既可以確保可見性也可以確保原子性。

(2)volatile屏蔽掉了JVM中必要的代碼優化,所以在效率上比較低,因此一定在必要時才使用此關鍵字。

介紹一下Netty?

  1. Netty是一個高性能、非同步事件驅動的NIO框架。

  2. 簡化並優化了TCP和UDP套接字等網路編程,性能和安全等很多方面都做了優化。

3.支持多種協議,如FTP、SMTP、HTTP以及各種二進位和基於文本的傳統協議。

在網路編程中,Netty是絕對的王者。

有很多開源項目都用到了Netty。

1。市面上很多消息推送系統都是基於Netty來做的。

2。我們常用的框架:Dubbo、RocketMQ、ES等等都用到了Netty。

使用Netty的項目統計:https://netty.io/wiki/related-projects.html

你好,我是yltrcc,日常分享技術點滴,歡迎關註我:ylcoder
本文由博客一文多發平臺 OpenWrite 發佈!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 第1章 課程介紹 試看2 節 | 10分鐘 包括課程概述、核心模塊、核心技術、課程安排、課程收穫、講授方式、學習前提等方面的介紹,讓同學們對課程項目有一個直觀的瞭解。 收起列表 視頻:1-1 課程導讀 (07:01)試看 視頻:1-2 課程適用於最新版node.js (02:08) 第2章 node ...
  • 第1章 課程導學與準備工作 試看4 節 | 33分鐘 本章節對課程的內容做介紹說明,以及本門課程能為學員帶來那些收穫。大家認真學習成為職業程式員。 收起列表 視頻:1-1 C++氣象項目數據中心實戰導學 (10:30)試看 視頻:1-2 項目介紹 (13:12)試看 視頻:1-3 開發環境 (01: ...
  • 第1章 課程介紹(磨刀不費砍柴工) 試看4 節 | 48分鐘 瞭解項目案例業務需求,觀看完整的項目演示。掌握學習本課程的方法,獲取課程授權碼,以及如何利用線上教程學習和答疑。 收起列表 視頻:1-1 課程導學 (17:37)試看 視頻:1-2 搭建開發環境 (18:11) 視頻:1-3 本課程學習方 ...
  • 目錄 一.簡介 二.效果演示 三.源碼下載 四.猜你喜歡 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 轉場 零基礎 O ...
  • 0、前言 本篇博客的初衷是為了特定的人弄的,這些內容在網上都可以找到原文,因此:這篇博客只是保證能夠讓特定的人看懂,其他人看得懂就看,看不懂拉倒,同時,這篇博客中細節說明沒有、運行截圖沒有、特別備註沒有...... 1、JUL 指的是Java Util Logging包,它是java原生的日誌框架, ...
  • 在kubernetes容器環境下 kafka會預設把主機名註冊到zookeeper。這個時候消費端部署在不同的命名空間或者不同的集群中會出現無法訪問的情況。用advertised.listeners配置可以重寫預設註冊的地址。 定義 listeners listeners 配置的是kafka Ser ...
  • 到底是什麼面試題, 讓一個工作了4年的精神小伙,只是去參加了一場技術面試, 就被搞得精神萎靡。鬱郁寡歡! 這一切的背後到底是道德的淪喪,還是人性的扭曲。 讓我們一起揭秘一下這道面試題。 關於, “簡述你對線程池的理解”,看看普通人和高手的回答。 普通人: 嗯。。。。。。。。。。 高手: 關於這個問題 ...
  • 目錄 一.簡介 二.效果演示 三.源碼下載 四.猜你喜歡 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 基礎 零基礎 OpenGL (ES) 學習路線推薦 : OpenGL (ES) 學習目錄 >> OpenGL ES 轉場 零基礎 O ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...