Zookeeper入門實戰(5)-分散式鎖

来源:https://www.cnblogs.com/wuyongyin/archive/2023/06/11/17316087.html
-Advertisement-
Play Games

在分散式環境中,當需要控制對某一資源的不同進程併發訪問時就需要使用分散式鎖;可以使用 ZooKeeper + Curator 來實現分散式鎖,本文主要介紹 Curator 中分散式鎖的使用,文中所使用到的軟體版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4. ...


在分散式環境中,當需要控制對某一資源的不同進程併發訪問時就需要使用分散式鎖;可以使用 ZooKeeper + Curator 來實現分散式鎖,本文主要介紹 Curator 中分散式鎖的使用,文中所使用到的軟體版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。

1、引入依賴

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.4.0</version>
</dependency>

2、使用樣例

2.1、可重入鎖

@Test
public void interProcessMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一線程中可重覆獲取
                lock.acquire();
                logger.info(Thread.currentThread().getName() + "獲得了鎖");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //獲取了幾次就要釋放幾次
                release(lock);
                release(lock);
                logger.info(Thread.currentThread().getName() + "釋放了鎖");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.2、不可重入鎖

@Test
public void interProcessSemaphoreMutex() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    cf.start();
    InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                lock.acquire();
                //同一線程中不可重覆獲取
                //lock.acquire();
                logger.info(Thread.currentThread().getName() + "獲得了鎖");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(lock);
                logger.info(Thread.currentThread().getName() + "釋放了鎖");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.3、讀寫鎖(可重入)

@Test
public void interProcessReadWriteLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
    InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
    InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                readLock.acquire();
                readLock.acquire();
                logger.info(Thread.currentThread().getName() + "獲得了讀鎖");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //獲取了幾次就要釋放幾次
                release(readLock);
                release(readLock);
                logger.info(Thread.currentThread().getName() + "釋放了讀鎖");
            }
            try {
                writeLock.acquire();
                logger.info(Thread.currentThread().getName() + "獲得了寫鎖");
                Thread.sleep(1000 * 2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(writeLock);
                logger.info(Thread.currentThread().getName() + "釋放了寫鎖");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.4、信號量

信號量用於控制對資源同時訪問的進程或線程數。

@Test
public void interProcessSemaphoreV2() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            Lease lease = null;
            try {
                //獲取一個許可
                lease = semaphore.acquire();
                logger.info(Thread.currentThread().getName() + "獲得了許可");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //釋放一個許可
                semaphore.returnLease(lease);
                logger.info(Thread.currentThread().getName() + "釋放了許可");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

2.5、多個鎖作為單個實體管理

InterProcessMultiLock 主要功能是將多個鎖合併為一個對象來操作,簡化了代碼量。

@Test
public void InterProcessMultiLock() throws InterruptedException {
    CuratorFramework cf = getCuratorFramework();
    InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
    InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

    cf.start();
    int size = 5;
    CountDownLatch countDownLatch = new CountDownLatch(size);
    for (int i = 0; i < size; i++) {
        new Thread(() -> {
            try {
                //相當於 lock.acquire() 和 lock2.acquire()
                multiLock.acquire();
                logger.info(Thread.currentThread().getName() + "獲得了鎖");
                Thread.sleep(1000 * 3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                release(multiLock);
                logger.info(Thread.currentThread().getName() + "釋放了鎖");
            }
            countDownLatch.countDown();
        }).start();
    }
    countDownLatch.await();
    cf.close();
}

 

分散式鎖使用樣例的完整代碼如下:

package com.inspur.demo.general.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

public class CuratorLockCase {
    private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class);

    private static String connectString = "10.49.196.33:2181";
    private static int sessionTimeout = 40 * 1000;
    private static int connectionTimeout = 60 * 1000;

    /**
     * 可重入鎖
     */
    @Test
    public void interProcessMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一線程中可重覆獲取
                    lock.acquire();
                    logger.info(Thread.currentThread().getName() + "獲得了鎖");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //獲取了幾次就要釋放幾次
                    release(lock);
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "釋放了鎖");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 不可重入鎖
     */
    @Test
    public void interProcessSemaphoreMutex() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        cf.start();
        InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    lock.acquire();
                    //同一線程中不可重覆獲取
                    //lock.acquire();
                    logger.info(Thread.currentThread().getName() + "獲得了鎖");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(lock);
                    logger.info(Thread.currentThread().getName() + "釋放了鎖");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 讀寫鎖(可重入)
     */
    @Test
    public void interProcessReadWriteLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3");
        InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
        InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    readLock.acquire();
                    readLock.acquire();
                    logger.info(Thread.currentThread().getName() + "獲得了讀鎖");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //獲取了幾次就要釋放幾次
                    release(readLock);
                    release(readLock);
                    logger.info(Thread.currentThread().getName() + "釋放了讀鎖");
                }
                try {
                    writeLock.acquire();
                    logger.info(Thread.currentThread().getName() + "獲得了寫鎖");
                    Thread.sleep(1000 * 2);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(writeLock);
                    logger.info(Thread.currentThread().getName() + "釋放了寫鎖");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    /**
     * 信號量,用於控制對資源同時訪問的進程或線程數
     */
    @Test
    public void interProcessSemaphoreV2() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3);

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                Lease lease = null;
                try {
                    //獲取一個許可
                    lease = semaphore.acquire();
                    logger.info(Thread.currentThread().getName() + "獲得了許可");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //釋放一個許可
                    semaphore.returnLease(lease);
                    logger.info(Thread.currentThread().getName() + "釋放了許可");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }


    /**
     * 多個鎖作為單個實體管理
     */
    @Test
    public void InterProcessMultiLock() throws InterruptedException {
        CuratorFramework cf = getCuratorFramework();
        InterProcessLock lock = new InterProcessMutex(cf, "/test/lock");
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2");
        InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2));

        cf.start();
        int size = 5;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    //相當於 lock.acquire() 和 lock2.acquire()
                    multiLock.acquire();
                    logger.info(Thread.currentThread().getName() + "獲得了鎖");
                    Thread.sleep(1000 * 3);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    release(multiLock);
                    logger.info(Thread.currentThread().getName() + "釋放了鎖");
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        cf.close();
    }

    private CuratorFramework getCuratorFramework() {
        RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .retryPolicy(retryPolicy)
                .build();
        return cf;
    }

    private void release(InterProcessLock lock) {
        if (lock != null) {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
CuratorLockCase.java

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 如果某個派生自 QObject 的類重寫 eventFilter 方法,那它就成了事件過濾器(Event Filter)。該方法的聲明如下: virtual bool eventFilter(QObject *watched, QEvent *event); watched 參數是監聽事件的對象,即 ...
  • # Go 連接 MySQL之 MySQL 預處理 ## 一、ChatGPT 關於 MySQL 預處理 的回答 ### 問:什麼是MySQL 的預處理 具體執行過程時什麼 #### ChatGPT 答: MySQL的預處理是一種在執行SQL語句之前,先進行編譯和優化的機制。它將SQL語句分成兩個階段: ...
  • ## UDP 簡介 UDP(User Datagram Protocol,用戶數據報協議)是傳輸層的另一種協議,比 TCP 具有更快的傳輸速度,但是不可靠。UDP 發送的數據單元被稱為 UDP 數據報,當網路傳輸 UDP 數據報時,無法保證數據報一定到達目的地,也無法保證各個數據報按發送的順序到達目 ...
  • 博客推行版本更新,成果積累制度,已經寫過的博客還會再次更新,不斷地琢磨,高質量高數量都是要追求的,工匠精神是學習必不可少的精神。因此,大家有何建議歡迎在評論區踴躍發言,你們的支持是我最大的動力,你們敢投,我就敢肝 ...
  • title: "Go 語言連接資料庫實現增刪改查" date: 2023-06-10T18:55:16+08:00 draft: true tags: ["Go"] categories: ["Go"] # Go 連接 MySQL實現增刪改查 ## 一、初始化連接 ### 創建項目 ![](http ...
  • 一、Maven常用命令及其作用 Maven的生命周期包括:clean、validate、compile、test、package、verify、install、site、deploy,其中需要註意的是:執行後面的命令時,前面的命令自動得到執行,(其中,也可以跳過其中的步驟,如:test,在mvn i ...
  • 博客推行版本更新,成果積累制度,已經寫過的博客還會再次更新,不斷地琢磨,高質量高數量都是要追求的,工匠精神是學習必不可少的精神。因此,大家有何建議歡迎在評論區踴躍發言,你們的支持是我最大的動力,你們敢投,我就敢肝 ...
  • ## 簡介 猜拳小游戲是一個經典的小游戲項目,也是初學者學習編程的必要練手題目之一。在 Python 中,我們可以使用多種方式來實現一個簡單的猜拳小游戲。 本文將依次介紹六種Python實現猜拳小游戲的方法,包括:使用 `if-else` 條件語句、使用 `random` 模塊、使用字典映射勝負關係 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...