Redisson源碼解讀-公平鎖

来源:https://www.cnblogs.com/konghuanxi/archive/2022/11/08/16869998.html
-Advertisement-
Play Games

前言 我在上一篇文章聊了Redisson的可重入鎖,這次繼續來聊聊Redisson的公平鎖。下麵是官方原話: 它保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。所有請求線程會在一個隊列中排隊,當某個線程出現宕機時,Redisson會等待5秒後繼續下一個線程,也就是說 ...


前言

我在上一篇文章聊了Redisson的可重入鎖,這次繼續來聊聊Redisson的公平鎖。下麵是官方原話:

它保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。所有請求線程會在一個隊列中排隊,當某個線程出現宕機時,Redisson會等待5秒後繼續下一個線程,也就是說如果前面有5個線程都處於等待狀態,那麼後面的線程會等待至少25秒。

源碼版本:3.17.7

這是我 fork 的分支,添加了自己理解的中文註釋:https://github.com/xiaoguyu/redisson

公平鎖

先上官方例子:

RLock fairLock = redisson.getFairLock("anyLock");
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();

因為在Redisson中,公平鎖和普通可重入鎖的邏輯大體上一樣,我在上一篇文章都介紹了,這裡就不再贅述。下麵開始介紹合理邏輯。

加鎖

加鎖的 lua 腳本在 RedissonFairLock#tryLockInnerAsync方法中

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        long wait = threadWaitTime;
        if (waitTime > 0) {
            wait = unit.toMillis(waitTime);
        }

        long currentTime = System.currentTimeMillis();
        if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
            ......
        }

        if (command == RedisCommands.EVAL_LONG) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    // remove stale threads
                    "while true do " +  // list為空,證明沒有人排隊,退出迴圈
                        "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
                        "if firstThreadId2 == false then " +
                            "break;" +
                        "end;" +
                        // 能到這裡,證明有人排隊,拿出在排隊的第一個人的超時時間,如果超時了,則移除相應數據
                        "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
                        "if timeout <= tonumber(ARGV[4]) then " +
                            // remove the item from the queue and timeout set
                            // NOTE we do not alter any other timeout
                            "redis.call('zrem', KEYS[3], firstThreadId2);" +
                            "redis.call('lpop', KEYS[2]);" +
                        "else " +
                            "break;" +
                        "end;" +
                    "end;" +

                    // check if the lock can be acquired now
                    // 檢查是否可以獲取鎖。如果hash和list都不存在,或者線程隊列的第一個是當前線程,則可以獲取鎖
                    "if (redis.call('exists', KEYS[1]) == 0) " +
                        "and ((redis.call('exists', KEYS[2]) == 0) " +
                            "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

                        // remove this thread from the queue and timeout set
                        // 都獲取鎖了,當然要從線程隊列和時間隊列中移除
                        "redis.call('lpop', KEYS[2]);" +
                        "redis.call('zrem', KEYS[3], ARGV[2]);" +

                        // decrease timeouts for all waiting in the queue
                        // 刷新時間集合中的時間
                        "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
                        "for i = 1, #keys, 1 do " +
                            "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
                        "end;" +

                        // acquire the lock and set the TTL for the lease
                        // 和公平鎖的設置一樣,值加1並且設置過期時間
                        "redis.call('hset', KEYS[1], ARGV[2], 1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +

                    // check if the lock is already held, and this is a re-entry
                    // 能到這裡,證明前面拿不到鎖,但是也要做可重入鎖的處理
                    "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
                        "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        "return nil;" +
                    "end;" +

                    // the lock cannot be acquired
                    // check if the thread is already in the queue
                    // 時間集合中有值,證明線程已經在隊列中,不需要往後執行邏輯了
                    "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
                    "if timeout ~= false then " +
                        // the real timeout is the timeout of the prior thread
                        // in the queue, but this is approximately correct, and
                        // avoids having to traverse the queue
                        // 因為下麵的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
                        // 所以這裡的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
                        "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
                    "end;" +

                    // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
                    // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
                    // threadWaitTime
                    "local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
                    "local ttl;" +
                    // 如果最後一個線程不是當前線程,則從時間集合取出(舉例:線程1/2/3按順序獲取鎖,此時pttl得到的是線程1的鎖過期時間,zscore拿到的是線程2的鎖的過期時間,此時線程3應該以線程2的為準)
                    "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
                        "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
                    "else " +
                        // 否則直接獲取鎖的存活時間
                        "ttl = redis.call('pttl', KEYS[1]);" +
                    "end;" +
                    // 過期時間 = 鎖存活時間 + 等待時間 + 當前時間戳
                    "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
                    // 如果添加到時間集合成功,則同時添加線程集合
                    "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                        "redis.call('rpush', KEYS[2], ARGV[2]);" +
                    "end;" +
                    "return ttl;",
                    Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
                    unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);
        }

        throw new IllegalArgumentException();
    }

公平鎖總共用了Redis的三種數據類型,對應著 lua 腳本裡面的keys1、2、3的參數:

  • KEYS[1]

    鎖的名字,使用 Hash 數據類型,是可重入鎖的基礎,結構為 {”threadId1”: 1, “thread2”: 1},key為線程id,value是鎖的次數

  • KEYS[2]

    線程隊列的名字,使用 List 數據類型,結構為 [ “threadId1”, “threadId2” ],按順序存放需要獲取鎖的線程的id

  • KEYS[3]

    時間隊列的名字,使用 sorted set 數據類型,結構為 {”threadId2”:123, “threadId1”:190},key為線程id,value為獲取鎖的超時時間戳

我下麵會用 鎖、線程隊列、時間隊列 來表示這3個數據結構,需要註意下我的表述。

同樣的,介紹下參數:

  • ARGV[1]:leaseTime 鎖的持有時間
  • ARGV[2]:線程id(描述不太準確,暫時按這樣理解)
  • ARGV[3]:waitTime 嘗試獲取鎖的最大等待時間
  • ARGV[4]:currentTime 當前時間戳

接下來,我們一段一段分析 lua 腳本,首先看最開始的 while 迴圈

"while true do " +  // list為空,證明沒有人排隊,退出迴圈
    "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
    "if firstThreadId2 == false then " +
        "break;" +
    "end;" +
    // 能到這裡,證明有人排隊,拿出在排隊的第一個人的超時時間,如果超時了,則移除相應數據
    "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
    "if timeout <= tonumber(ARGV[4]) then " +
        // 從時間隊列和線程隊列中移除
        "redis.call('zrem', KEYS[3], firstThreadId2);" +
        "redis.call('lpop', KEYS[2]);" +
    "else " +
        "break;" +
    "end;" +
"end;" +

具體的邏輯我在註釋中寫的很清楚了,看的時候記住 KEYS[2]、KEYS[3] 對應著線程隊列和時間隊列介面。主要註意的是,線程隊列只有當一個線程持有鎖,另一個線程獲取不到鎖時,才會有值(前面有人才排隊,沒人排什麼隊)。接著看第二段

// 檢查是否可以獲取鎖。當鎖不存在,並且線程隊列不存在或者線程隊列第一位是當前線程,則可以獲取鎖
"if (redis.call('exists', KEYS[1]) == 0) " +
    "and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +

    // remove this thread from the queue and timeout set
    // 都獲取鎖了,當然要從線程隊列和時間隊列中移除
    "redis.call('lpop', KEYS[2]);" +
    "redis.call('zrem', KEYS[3], ARGV[2]);" +

    // decrease timeouts for all waiting in the queue
    // 刷新時間隊列中的時間
    "local keys = redis.call('zrange', KEYS[3], 0, -1);" +
    "for i = 1, #keys, 1 do " +
        "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
    "end;" +

    // acquire the lock and set the TTL for the lease
    // 和公平鎖的設置一樣,值加1並且設置過期時間
    "redis.call('hset', KEYS[1], ARGV[2], 1);" +
    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
    "return nil;" +
"end;" +

翻譯翻譯就是,鎖不存在(別人沒有持有鎖)並且線程隊列不存在或者線程隊列第一位是當前線程(不用排隊或者自己排第一)才能獲得鎖。因為時間隊列中存放的是各個線程等待鎖的超時時間戳,所以每次都需要刷新下。繼續下一段邏輯

// 能到這裡,證明前面拿不到鎖,但是也要做可重入鎖的處理
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
    "redis.call('hincrby', KEYS[1], ARGV[2],1);" +
    "redis.call('pexpire', KEYS[1], ARGV[1]);" +
    "return nil;" +
"end;" +

這是可重入鎖的處理,繼續下一段

// 時間隊列中有值,證明線程已經在隊列中,不需要往後執行邏輯了
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
    // the real timeout is the timeout of the prior thread
    // in the queue, but this is approximately correct, and
    // avoids having to traverse the queue
    // 因為下麵的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
    // 所以這裡的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
    "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +

舉例子:線程1持有鎖,線程2嘗試第一次獲取鎖(不進入這段if),線程2第二次獲取鎖(進入了這段if)。繼續下一段

"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
// 如果最後一個線程不是當前線程,則從時間集合取出(舉例:線程1/2/3按順序獲取鎖,此時pttl得到的是線程1的鎖過期時間,zscore拿到的是線程2的鎖的過期時間,此時線程3應該以線程2的為準)
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
    "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
    // 否則直接獲取鎖的存活時間
    "ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
// 過期時間 = 鎖存活時間 + 等待時間 + 當前時間戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
// 如果添加到時間集合成功,則同時添加線程集合
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
    "redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",

ttl 這段的獲取邏輯,翻譯翻譯就是,如果前面有人排隊,就以前面的超時時間為準,如果沒人排隊,就拿鎖的超時時間。獲取到 ttl ,就對添加到線程集合和時間集合。

以上就是公平鎖的加鎖 lua 腳本的全部邏輯。講的有點亂,但是只要能搞清楚keys1、2、3對應著哪種數據類型,理解整個邏輯應該問題不大。

解鎖

解鎖的核心 lua 腳本是下麵這段RedissonFairLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // remove stale threads
            "while true do "  // 線程隊列為空,證明沒有人排隊,退出迴圈
            + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
            + "if firstThreadId2 == false then "
                + "break;"
            + "end; "
            // 能到這裡,證明有人排隊,拿出在排隊的第一個人的超時時間,如果超時了,則移除相應數據
            + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
            + "if timeout <= tonumber(ARGV[4]) then "
                + "redis.call('zrem', KEYS[3], firstThreadId2); "
                + "redis.call('lpop', KEYS[2]); "
            + "else "
                + "break;"
            + "end; "
          + "end;"
            // 如果鎖不存在,則通過訂閱發佈機制通知下一個等待中的線程
          + "if (redis.call('exists', KEYS[1]) == 0) then " + 
                "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                "if nextThreadId ~= false then " +
                    "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                "end; " +
                "return 1; " +
            "end;" +
            // 如果當前線程已經不存在鎖裡面,直接返回null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 可重入鎖處理邏輯,對當前線程的鎖次數減1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                // 鎖次數仍然大於0,則刷新鎖的存活時間
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "end; " +

            // 刪除鎖
            "redis.call('del', KEYS[1]); " +
            // 訂閱發佈機制通知下一個等待中的線程
            "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
            "if nextThreadId ~= false then " +
                "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
            "end; " +
            "return 1; ",
            Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
            LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}

算了,不想寫了,看註釋吧。

總結

本文介紹了Redisson的公平鎖,邏輯大體上和普通可重入鎖一致,核心在於 lua 腳本,運用了Redis的3種數據類型。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 簡介: 代理模式,是結構型的設計模式。用於為其它對象提供一種代理以控制對這個對象的訪問。 目標對象可以是遠程的對象、創建開銷大的對象或需要安全控制的對象,並且可以在不改變目標對象的情況下添加一些額外的功能。 適用場景: 調用端不想或不能直接調用的對象。 服務端不想讓調用端看到核心實現。 優點: 服務 ...
  • 前言 大家早好、午好、晚好吖~ 最近,又爆出了許多例,身在長沙得我前段時間不是在做核酸就是在做核酸得路上 雖然現在還是隔一天一捅(小聲嗶嗶:我真的遭不住)希望疫情早日過去 疫情尚未結束,我們需要做好自己,時刻防範,不給別人添麻煩。 今天我們來嘗試用Python抓取世界疫情,實現可視化地圖展示。 採集 ...
  • Servlet03 11.練習 快捷鍵-可以快速地在訪問的文件件切換 ctrl+alt+向左箭頭:回到上次訪問的位置 ctrl+alt+向右箭頭:回到下一步訪問的位置 11.1CatServlet 首先創建項目servlet,配置好Tomcat,添加web應用支持。在web目錄下麵的WEB-INF目 ...
  • 1.實現攔截器 1.寫一個攔截器 繼承HandlerInterceptor preHandle: 調用時間: Controller方法處理之前【也就是路徑跳轉之前】; 執行順序: 鏈式Intercepter情況下,Intercepter按照聲明的順序一個接一個執行; 返回值: 返回值為true,則繼 ...
  • Tb/clock 這題要求給dut模塊一個時鐘。 module top_module ( ); reg clk; always #5 clk=~clk; initial begin clk = 0; end dut u0(clk); endmodule Tb/tb1 產生指定的波形,使用延時語句給信 ...
  • 面向對象之元類 一、什麼是元類 Python中一切皆為對象,對象是有類實例化生成; 類也是對象(類對象),生成類對象的類可稱之為元類; 所以,元類就是來創建類對象的,可稱之為類工廠; type是python內建元類,type是最上層的元類,也可稱為一切類對象的元類 二、元類推導流程 """推導步驟1 ...
  • 前言 嗨嘍~大家好呀,這裡是魔王吶 ! 知識點: 動態數據抓包 requests發送請求 json數據解析 開發環境: python 3.8 運行代碼 pycharm 2021.2 輔助敲代碼 requests pip install requests 思路分析 如何去實現一個案例: 簡單的 基礎知 ...
  • 哈嘍兄弟們,本節咱們來複習一下Python基礎入門中的if語句。 編程中經常需要檢查一系列條件,並據此決定採取什麼措施。在python中,if語句能檢測你的程式的當前狀態,並據此採取什麼措施。 if語句功能 可以作為條件測試檢查是否相等,檢查是不相等數值比較,檢查多個條件等!下麵來一些簡單的示例: ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...