zookeeper【5】分散式鎖

来源:https://www.cnblogs.com/tinyj/archive/2018/11/27/10029248.html
-Advertisement-
Play Games

我們常說的鎖是單進程多線程鎖,在多線程併發編程中,用於線程之間的數據同步,保護共用資源的訪問。而分散式鎖,指在分散式環境下,保護跨進程、跨主機、跨網路的共用資源,實現互斥訪問,保證一致性。 架構圖: 分散式鎖獲取思路a、在獲取分散式鎖的時候在locker節點下創建臨時順序節點,釋放鎖的時候刪除該臨時 ...


我們常說的鎖是單進程多線程鎖,在多線程併發編程中,用於線程之間的數據同步,保護共用資源的訪問。而分散式鎖,指在分散式環境下,保護跨進程、跨主機、跨網路的共用資源,實現互斥訪問,保證一致性。

 

架構圖:

 

分散式鎖獲取思路
a、在獲取分散式鎖的時候在locker節點下創建臨時順序節點,釋放鎖的時候刪除該臨時節點。

b、客戶端調用createNode方法在locker下創建臨時順序節點,然後調用getChildren(“locker”)來獲取locker下麵的所有子節點,註意此時不用設置任何Watcher。

c、客戶端獲取到所有的子節點path之後,如果發現自己創建的子節點序號最小,那麼就認為該客戶端獲取到了鎖。

d、如果發現自己創建的節點並非locker所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,然後對其調用exist()方法,同時對其註冊事件監聽器。

e、之後,讓這個被關註的節點刪除,則客戶端的Watcher會收到相應通知,此時再次判斷自己創建的節點是否是locker子節點中序號最小的,如果是則獲取到了鎖,如果不是則重覆以上步驟繼續獲取到比自己小的一個節點並註冊監聽。

 

實現代碼:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class BaseDistributedLock {

    private final ZkClientExt client;
    private final String  path;
    private final String  basePath;
    private final String  lockName;
    private static final Integer  MAX_RETRY_COUNT = 10;

    public BaseDistributedLock(ZkClientExt client, String path, String lockName){

        this.client = client;
        this.basePath = path;
        this.path = path.concat("/").concat(lockName);
        this.lockName = lockName;

    }

    // 刪除成功獲取鎖之後所創建的那個順序節點
    private void deleteOurPath(String ourPath) throws Exception{
        client.delete(ourPath);
    }

    // 創建臨時順序節點
    private String createLockNode(ZkClient client, String path) throws Exception{
        return client.createEphemeralSequential(path, null);
    }

    // 等待比自己次小的順序節點的刪除
    private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{

        boolean  haveTheLock = false;
        boolean  doDelete = false;

        try {

            while ( !haveTheLock ) {
                // 獲取/locker下的經過排序的子節點列表
                List<String> children = getSortedChildren();

                // 獲取剛纔自己創建的那個順序節點名
                String sequenceNodeName = ourPath.substring(basePath.length()+1);

                // 判斷自己排第幾個
                int  ourIndex = children.indexOf(sequenceNodeName);
                if (ourIndex < 0){ // 網路抖動,獲取到的子節點列表裡可能已經沒有自己了
                    throw new ZkNoNodeException("節點沒有找到: " + sequenceNodeName);
                }

                // 如果是第一個,代表自己已經獲得了鎖
                boolean isGetTheLock = ourIndex == 0;

                // 如果自己沒有獲得鎖,則要watch比我們次小的那個節點
                String  pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);

                if ( isGetTheLock ){
                    haveTheLock = true;

                } else {

                    // 訂閱比自己次小順序節點的刪除事件
                    String  previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {

                        public void handleDataDeleted(String dataPath) throws Exception {
                            latch.countDown(); // 刪除後結束latch上的await
                        }

                        public void handleDataChange(String dataPath, Object data) throws Exception {
                            // ignore
                        }
                    };

                    try {
                        //訂閱次小順序節點的刪除事件,如果節點不存在會出現異常
                        client.subscribeDataChanges(previousSequencePath, previousListener);

                        if ( millisToWait != null ) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 ) {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }

                            latch.await(millisToWait, TimeUnit.MICROSECONDS); // 在latch上await
                        } else {
                            latch.await(); // 在latch上await
                        }

                        // 結束latch上的等待後,繼續while重新來過判斷自己是否第一個順序節點
                    }
                    catch ( ZkNoNodeException e ) {
                        //ignore
                    } finally {
                        client.unsubscribeDataChanges(previousSequencePath, previousListener);
                    }

                }
            }
        }
        catch ( Exception e ) {
            //發生異常需要刪除節點
            doDelete = true;
            throw e;
        } finally {
            //如果需要刪除節點
            if ( doDelete ) {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if ( index >= 0 ) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }

    // 獲取/locker下的經過排序的子節點列表
    List<String> getSortedChildren() throws Exception {
        try{

            List<String> children = client.getChildren(basePath);
            Collections.sort(
                    children, new Comparator<String>() {
                        public int compare(String lhs, String rhs) {
                            return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                        }
                    }
            );
            return children;

        } catch (ZkNoNodeException e){
            client.createPersistent(basePath, true);
            return getSortedChildren();
        }
    }

    protected void releaseLock(String lockPath) throws Exception{
        deleteOurPath(lockPath);
    }

    protected String attemptLock(long time, TimeUnit unit) throws Exception {

        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        int             retryCount = 0;

        //網路閃斷需要重試一試
        while ( !isDone ) {
            isDone = true;

            try {
                // 在/locker下創建臨時的順序節點
                ourPath = createLockNode(client, path);
                // 判斷自己是否獲得了鎖,如果沒有獲得那麼等待直到獲得鎖或者超時
                hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
            } catch ( ZkNoNodeException e ) { // 捕獲這個異常
                if ( retryCount++ < MAX_RETRY_COUNT ) { // 重試指定次數
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
        if ( hasTheLock ) {
            return ourPath;
        }

        return null;
    }


}
import java.util.concurrent.TimeUnit;

public interface DistributedLock {

    /*
     * 獲取鎖,如果沒有得到就等待
     */
    public void acquire() throws Exception;

    /*
     * 獲取鎖,直到超時
     */
    public boolean acquire(long time, TimeUnit unit) throws Exception;

    /*
     * 釋放鎖
     */
    public void release() throws Exception;


}
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SimpleDistributedLockMutex extends BaseDistributedLock implements
        DistributedLock {

    //鎖名稱首碼,成功創建的順序節點如lock-0000000000,lock-0000000001,...
    private static final String LOCK_NAME = "lock-";

    // zookeeper中locker節點的路徑
    private final String basePath;

    // 獲取鎖以後自己創建的那個順序節點的路徑
    private String ourLockPath;

    private boolean internalLock(long time, TimeUnit unit) throws Exception {

        ourLockPath = attemptLock(time, unit);
        return ourLockPath != null;

    }

    public SimpleDistributedLockMutex(ZkClientExt client, String basePath){

        super(client,basePath,LOCK_NAME);
        this.basePath = basePath;

    }

    // 獲取鎖
    public void acquire() throws Exception {
        if ( !internalLock(-1, null) ) {
            throw new IOException("連接丟失!在路徑:'"+basePath+"'下不能獲取鎖!");
        }
    }

    // 獲取鎖,可以超時
    public boolean acquire(long time, TimeUnit unit) throws Exception {

        return internalLock(time, unit);
    }

    // 釋放鎖
    public void release() throws Exception {

        releaseLock(ourLockPath);
    }


}
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;

public class TestDistributedLock {

    public static void main(String[] args) {

        final ZkClientExt zkClientExt1 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());
        final SimpleDistributedLockMutex mutex1 = new SimpleDistributedLockMutex(zkClientExt1, "/Mutex");

        final ZkClientExt zkClientExt2 = new ZkClientExt("192.168.1.105:2181", 5000, 5000, new BytesPushThroughSerializer());
        final SimpleDistributedLockMutex mutex2 = new SimpleDistributedLockMutex(zkClientExt2, "/Mutex");

        try {
            mutex1.acquire();
            System.out.println("Client1 locked");
            Thread client2Thd = new Thread(new Runnable() {

                public void run() {
                    try {
                        mutex2.acquire();
                        System.out.println("Client2 locked");
                        mutex2.release();
                        System.out.println("Client2 released lock");

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            client2Thd.start();
            Thread.sleep(5000);
            mutex1.release();
            System.out.println("Client1 released lock");

            client2Thd.join();

        } catch (Exception e) {

            e.printStackTrace();
        }

    }

}
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.Callable;

public class ZkClientExt extends ZkClient {

    public ZkClientExt(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
        super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
    }

    @Override
    public void watchForData(final String path) {
        retryUntilConnected(new Callable<Object>() {

            public Object call() throws Exception {
                Stat stat = new Stat();
                _connection.readData(path, stat, true);
                return null;
            }

        });
    }

}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.列表定義: 列表由數據構成的有限序列,即按照一定的線性順序排列,排列而成的數據項的集合。 2.創建列表: 使用‘[ ]’括起來就已經創建了一個列表,例: 3.訪問列表中的值: 3.1 通過下標訪問列表中的值: 3.2 通過切片的方式,訪問多個列表值: 3.2.1 切片 格式:[start,end ...
  • 多線程 多個線程等待一個線程的一次性事件 背景:從多個線程訪問同一個std::future,也就是多個線程都在等待同一個線程的結果,這時怎麼處理。 辦法:由於std::future只能被調用一次get方法,也就是只能被某一個線程等待(同步)一次,不支持被多個線程等待。所以std::sharted_f ...
  • php timer.php 每500毫秒執行一次 ...
  • 1、什麼是進程 進程(Process)是電腦中的程式關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。 同一個程式執行兩次,就會產生兩個進程 ## 進程調度演算法: 先來先服務 短作業優先 時間片輪轉法 多級反饋隊列 2、併發和並行 併發是偽並行,只是看起來是 ...
  • 1.閉包函數也叫匿名函數,一個沒有指定名稱的函數,一般會用在回調部分 2.閉包作為回調的基本使用, echo preg_replace_callback('~-([a-z])~', function ($match) { return strtoupper($match[1]); }, 'hello... ...
  • 1.進程: 進程(Process)是電腦中的程式關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的電腦結構中,進程是程式的基本執行實體;在當代面向線程設計的電腦結構中,進程是線程的容器。程式是指令、數據及其組織形式的描述,進程是程式的 ...
  • 思路 首先肯定要樹形dp,一直沒想到怎麼用左偏樹。如果不斷彈出又不斷地合併複雜度不就太高了。瞄了眼題解才知道可以直接用大根樹。然後記錄出當前這棵左偏樹的大小(樹裡面所有點的薪水之和)以及點的個數。然後不斷的刪點。直到薪水滿足條件為止。 ...
  • 題意 "題目鏈接" Sol 直接把序列複製一遍 尾碼數組即可 在前$N$個位置中取$rak$最小的輸出 cpp include using namespace std; const int MAXN = 1e6 + 10; inline int read() { char c = getchar() ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...