R2M分散式鎖原理及實踐

来源:https://www.cnblogs.com/Jcloud/archive/2023/02/07/17098240.html
-Advertisement-
Play Games

R2M分散式鎖原理可以理解為一條內容或者圖片+文字+鏈接的載體,常見的案例有鎖說明和分散式鎖選擇、r2m分散式鎖選擇、r2m分散式鎖原理,加鎖核心流程。 ...


作者:京東科技 張石磊

1 案例引入

名詞簡介:

資源:可以理解為一條內容,或者圖+文字+鏈接的載體。

檔位ID: 資源的分類組,資源必須歸屬於檔位。

問題描述:當同一個檔位下2條資源同時審批通過時,收到擎天審批系統2條消息,消費者應用部署了2台機器,此時正好由2台機器分別消費,在併發消費時,先更新資源狀態,然後寫緩存,每次取前100條資源,類似select * from resource where gear_id=xxx limit 100 order by id desc;

在寫檔位緩存,此時事務未提交,併發查詢時根據檔位Id查詢時查詢不到對方的數據,全量寫緩存時導致後寫的緩存覆蓋了先寫的緩存,即緩存被覆蓋,導致投放資源缺失。

方案思考 :

方案1:一臺機器消費mq–單點問題

方案2:將同檔位ID的資源路由到同一個queue,需要審批系統配合根據檔位Id做路由,審批系統發的消息不只是cms審批數據,此方案不適用。

方案3:在檔位級別加分散式鎖。

經比較,最終採用方案3是合適的方案.

2 鎖說明和分散式鎖選擇

synchronized鎖的粒度是JVM進程維度,集群模式下,不能對共用資源加鎖,此時需要跨JVM進程的分散式鎖。

分散式鎖方式核心實現方式優點缺點分析

1 資料庫:

悲觀鎖,lock

樂觀鎖,通過版本號實現version

實現簡單,不依賴中間件

資料庫IO瓶頸,性能差

單實例存在單點問題,主從架構存在數據不一致,主從切換時其他客戶端可重覆加鎖。

2 zookeeper

創建臨時節點

CP模型,可靠性高,不存在主從切換不一致問題

頻繁創建和銷毀臨時節點,且

集群模式下,leader數據需要同步到follower才算加鎖成功,性能不如redis

主從切換服務不可用

3 redis集群

setnx+expire

性能高

有封裝好的框架redission

支持超時自動刪除鎖

集群支持高可用,AP模型

主從切換時其他客戶端可重覆加鎖。

R2M是基於開源的Redis cluster(Redis 3.0以上版本)構建的高性能分散式緩存系統,我們系統一直在使用,3.2.5版本開始支持分散式鎖。

3 r2m分散式鎖原理

示例代碼:

String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());

R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);

//獲取鎖,鎖的預設有效期30s,獲取不到鎖就阻塞

lock.lock();

try {

    //業務代碼

    resourceService.afterApprovedHandle(resource);

}  finally {

    //釋放鎖

    lock.unlock();

}



1 加鎖核心流程:

加鎖流程圖:

1):嘗試獲取鎖,通過執行加鎖Lua腳本來做;

2):若第一步未獲取到鎖,則去redis訂閱解鎖消息

3):一旦持有鎖的線程釋放了鎖,就會廣播解鎖消息,其他線程自旋重新嘗試獲取鎖。

核心加鎖原理:使用lua腳本封裝了hset和pexpire命令,保證是一個原子操作, KEYS[1]是加鎖的key,argv[2]是加鎖的客戶端ID(UUID+線程ID),ARGV[1]是鎖的有效期,預設30s.

private Object acquireInternal(List<String> args) {

if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {

return -1L;

} else {

try {

//hash結構,hash的key是加鎖的key,鍵值對的key為客戶端的UUID+線程id,value為鎖的重入計數器值。

return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),

"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;

return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);

} catch (Exception var3) {

this.setUnlocked();

throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3);

}

}

}



args參數

private List<String> acquireArgs(long leaseTime) {

        List<String> args = new ArrayList();

        if (leaseTime != -1L) {

            args.add(String.valueOf(leaseTime));

        } else {

             //預設30s

            args.add(String.valueOf(this.internalLockLeaseTime()));

        }

        //UUID+當前線程id

        args.add(this.currentThreadLockId(Thread.currentThread().getId()));

        return args;

    }





獲取鎖失敗訂閱鎖的channel

//獲取鎖失敗,訂閱釋放鎖的消息

 private boolean failedAcquire() {

            this.subLock();

            return false;

        }

 

 private void subLock() {

            CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);

            if (cf != null) {

                cf.handleAsync(this::reSubIfEx);

            }

 

        }



鎖釋放後,訂閱者通過自旋嘗試獲取鎖。

//tryAcquire獲取鎖,!tryAcquire就是獲取鎖失敗,鎖釋放後,通知線程喚醒後返回false,然後通過自旋,嘗試獲取鎖,

public final void acquire(long arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

 

 final boolean acquireQueued(final Node node, long arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            //內部自旋獲取鎖

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }



2 釋放鎖核心邏輯:

1)刪除分散式鎖key(如果可重入鎖計數為0)

  1. 發釋放鎖的廣播消息

3)取消watchdog

private Object unlockInternal(List<String> args) {

        logger.debug("{} trying to unlock.", Thread.currentThread().getId());

 

        Object var2;

        try {

     //判斷鎖 key 是否存在,如果存在,然後遞減hash的value值,當value值為0時再刪除鎖key,並且廣播釋放鎖的消息

            if (this.unlockSha() == null) {

                var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);

                return var2;

            }

 

            var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);

        } catch (Exception var6) {

            throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6);

        } finally {

            this.finalizeRelease();

        }

 

        return var2;

    }

 

//取消當前線程的watchdog

 private void finalizeRelease() {

        long threadId = Thread.currentThread().getId();

        R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);

        if (entry != null) {

            entry.release(threadId);

            if (entry.isReleased()) {

                //取消這個線程watchdog定時任務

                entry.getExtTask().cancel();

                this.expEntry.compareAndSet(entry, (Object)null);

                //從緩存watchdog線程的map中刪除該線程

                this.entryCache.remove(threadId);

            }

        }

 

    }



3 鎖的健壯性思考

1 業務沒執行完,鎖超時過期怎麼辦?

客戶端加鎖預設有效期30s,超過有效期後如果業務沒執行完,還需要持有這把鎖,r2m客戶端提供了續期機制,也就是watchdog機制。

watchdog原理:客戶端線程維度(UUID+線程ID,客戶端維護一個MAP,key就是UUID+線程ID)的後臺定時線程,獲取鎖成功後,如果客戶端還持有當前鎖,每隔10s(this.internalLockLeaseTime() / 3L),去延長鎖的有效期(internalLockLeaseTime)

//watchdog核心機制 ,internalLockLeaseTime預設30s

private void extendLock(long holderId) {

        if (this.expEntry.get() != null) {

            R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);

            if (holderEntry != null) {

                 //每隔10s,如果當前線程持有鎖,則續期30s

                if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {

                    Timeout task = this.timer().newTimeout((timeout) -> {

                        if (this.extendLockInternal(holderId)) {

                            this.extendLock(holderId);

                        }

 

                    }, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);

                    if (this.expEntry.get() != null) {

                        ((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);

                    }

                }

 

            }

        }

    }

 

 //執行續期lua腳本

 private boolean extendLockInternal(long threadId) {

        Object result;

        try {

           //只續期

            if (this.extendLockSha() != null) {

                result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));

            } else {

                result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));

            }

        } catch (Exception var5) {

            return false;

        }

 

        return Long.parseLong(result.toString()) == 1L;

    }



2 客戶端宕機,鎖如何釋放?

分散式鎖是有效期的,客戶端宕機後,watchdog機制失效,鎖過期自動失效。

3 redis分散式鎖集群模式下缺陷

r2m集群模式,極端情況,master加鎖成功,宕機,還未來得及同步到slave,主從切換,slave切換成master,可以繼續加鎖,對於非及其嚴格加鎖場景,該方案可滿足,屬於AP;對於嚴格場景下的分散式鎖,可採用基於zookeeper的分散式鎖,屬於CP,leader宕機,folllower選舉時不可用。性能上redis更優。

4 鎖的釋放問題

註意鎖的釋放在finally中釋放,必須由鎖的持有者釋放,不能由其他線程釋放別人的鎖,示例代碼中lock放到try的外面。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Flutter是Google推出的一款UI工具包,可以通過一套代碼同時在iOS和Android上構建媲美原生體驗的精美應用。它使用Dart作為開發語言,不依賴原生控制項,而是將自有的控制項庫,通過Skia圖形引擎直接繪製在平臺所提供的畫布上。簡單來說,它擁有以下特性:不依賴平臺、組件庫原生實現、能高速渲... ...
  • Flex 佈局目前已經非常流行了,現在幾乎已經相容所有瀏覽器了。在文章開始之前我們需要思考一個問題:我們為什麼要使用 Flex 佈局? 其實答案很簡單,那就是 Flex 佈局好用。一個新事物的出現往往是因為舊事物不那麼好用了,比如,如果想讓你用傳統的 css 佈局來實現一個塊元素垂直水平居中你會怎麼 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 使用須知 2017年下半年,微信6.5.21版本支持線上音視頻功能。開發者可以通過兩個音視頻組件 和 實現實時地線上直播、視頻通話、語音通話等功能。 上述功能需要用到兩個小程式媒體組件中的兩個: live-pusher 與 live-pl ...
  • 摘要:當協議、子功能變數名稱、主功能變數名稱、埠號中任意一個不相同時,都算作不同域。不同域之間相互請求資源,就算作“跨域”。 本文分享自華為雲社區《九種跨域方式實現原理咋回事》,作者:龍哥手記 一、什麼是跨域? 1.什麼是同源策略及其限制內容? 同源策略是一種約定,它是瀏覽器最核心也最基本的安全功能,如果缺少了同 ...
  • 作者: 京東零售 陳震 一、 什麼是Backbone 在前端的發展道路中,前端框架元老之一jQuery對繁瑣的DOM操作進行了封裝,提供了鏈式調用、各類選擇器,屏蔽了不同瀏覽器寫法的差異性,但是前端開發過程中依然存在作用域污染、代碼復用度低、冗餘度高、數據和事件綁定煩瑣等痛點。 5年後,Backbo ...
  • 1.Api 自動導入 unplugin-auto-import自動引入 composition api,不需要再手動引入。(npm 地址) 下載 npm i -D unplugin-auto-import 配置 vite.config.ts import AutoImport from "unplu ...
  • 如果想實現chatGPT的網頁版,調用介面就可以了,但是如果需要聯繫上下文語境,就需要在傳遞的數據的時候進行下拼接 傳參的時候對所有的對話數據進行拼接,拼成下麵這樣 {"prompt":"(You:在嗎\n)這裡在哦,有什麼可以幫助你的嗎?(You:你這個系統多少錢\n)抱歉,您想知道什麼?這裡是客 ...
  • 摘要:jstat命令可以查看堆記憶體各部分的使用量,以及載入類的數量。 本文分享自華為雲社區《JVM之通過jstat命令進行查看堆記憶體使用情況》,作者:共飲一杯無 。 基本概念 jstat是JDK自帶的一個輕量級小工具。它位於java的bin目錄下,主要利用JVM內建的指令對Java應用程式的資源和性 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...