在分散式環境中,當需要控制對某一資源的不同進程併發訪問時就需要使用分散式鎖;可以使用 ZooKeeper + Curator 來實現分散式鎖,本文主要介紹 Curator 中分散式鎖的使用,文中所使用到的軟體版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4. ...
在分散式環境中,當需要控制對某一資源的不同進程併發訪問時就需要使用分散式鎖;可以使用 ZooKeeper + Curator 來實現分散式鎖,本文主要介紹 Curator 中分散式鎖的使用,文中所使用到的軟體版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。
1、引入依賴
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency>
2、使用樣例
2.1、可重入鎖
@Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一線程中可重覆獲取 lock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了鎖"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //獲取了幾次就要釋放幾次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "釋放了鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.2、不可重入鎖
@Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一線程中不可重覆獲取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了鎖"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "釋放了鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.3、讀寫鎖(可重入)
@Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了讀鎖"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //獲取了幾次就要釋放幾次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "釋放了讀鎖"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了寫鎖"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "釋放了寫鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.4、信號量
信號量用於控制對資源同時訪問的進程或線程數。
@Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { Lease lease = null; try { //獲取一個許可 lease = semaphore.acquire(); logger.info(Thread.currentThread().getName() + "獲得了許可"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //釋放一個許可 semaphore.returnLease(lease); logger.info(Thread.currentThread().getName() + "釋放了許可"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.5、多個鎖作為單個實體管理
InterProcessMultiLock 主要功能是將多個鎖合併為一個對象來操作,簡化了代碼量。
@Test public void InterProcessMultiLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2"); InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2)); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { //相當於 lock.acquire() 和 lock2.acquire() multiLock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了鎖"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(multiLock); logger.info(Thread.currentThread().getName() + "釋放了鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
分散式鎖使用樣例的完整代碼如下:
package com.inspur.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.concurrent.CountDownLatch; public class CuratorLockCase { private static Logger logger = LoggerFactory.getLogger(CuratorLockCase.class); private static String connectString = "10.49.196.33:2181"; private static int sessionTimeout = 40 * 1000; private static int connectionTimeout = 60 * 1000; /** * 可重入鎖 */ @Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一線程中可重覆獲取 lock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了鎖"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //獲取了幾次就要釋放幾次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "釋放了鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 不可重入鎖 */ @Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一線程中不可重覆獲取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了鎖"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "釋放了鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 讀寫鎖(可重入) */ @Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了讀鎖"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //獲取了幾次就要釋放幾次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "釋放了讀鎖"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了寫鎖"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "釋放了寫鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 信號量,用於控制對資源同時訪問的進程或線程數 */ @Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(cf, "/test/lock4", 3); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { Lease lease = null; try { //獲取一個許可 lease = semaphore.acquire(); logger.info(Thread.currentThread().getName() + "獲得了許可"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //釋放一個許可 semaphore.returnLease(lease); logger.info(Thread.currentThread().getName() + "釋放了許可"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } /** * 多個鎖作為單個實體管理 */ @Test public void InterProcessMultiLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); InterProcessLock lock2 = new InterProcessSemaphoreMutex(cf, "/test/lock2"); InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock, lock2)); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { //相當於 lock.acquire() 和 lock2.acquire() multiLock.acquire(); logger.info(Thread.currentThread().getName() + "獲得了鎖"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(multiLock); logger.info(Thread.currentThread().getName() + "釋放了鎖"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); } private CuratorFramework getCuratorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); return cf; } private void release(InterProcessLock lock) { if (lock != null) { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }CuratorLockCase.java