Redis高併發分散式鎖詳解

来源:https://www.cnblogs.com/chafry/archive/2022/09/30/16742964.html
-Advertisement-
Play Games

介紹了分散式鎖的特性,模擬想要實現redis分散式鎖的演變流程,分析redisson源碼是如何實現分散式鎖的,面對高併發下,我們該如何提升分散式鎖性能 ...


為什麼需要分散式鎖

  1.為瞭解決Java共用記憶體模型帶來的線程安全問題,我們可以通過加鎖來保證資源訪問的單一,如JVM內置鎖synchronized,類級別的鎖ReentrantLock。

  2.但是隨著業務的發展,單機服務畢竟存在著限制,故會往多台組合形成集群架構,面對集群架構,我們同樣存在則資源共用問題,而每台伺服器有著自己的JVM,這時候我們對於鎖的實現不得不考慮分散式的實現。

 

分散式鎖應該具備哪些條件

  1.在分散式系統環境下,一個方法在同一時間只能被一個機器的一個線程執行

  2.高可用的獲取鎖與釋放鎖

  3.高性能的獲取鎖與釋放鎖

  4.具備可重入特性(可理解為重新進入,由多於一個任務併發使用,而不必擔心數據錯誤)

  5.具備鎖失效機制,即自動解鎖,防止死鎖

  6.具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗

 

秒殺搶購場景模擬(模擬併發問題:其實就是指每一步如果存在間隔時間,那麼當某一線程間隔時間拉長,會對其餘線程造成什麼影響

  0.如果要在本機測試的話

    1)配置Nginx實現負載均衡

http {
    upstream testfuzai {
        server 127.0.0.1:8080 weight=1;
        server 127.0.0.1:8090 weight=1;
    }
  
    server {
        listen 80;
        server_name localhost;
  
        location / {
            //proxy_pass:設置後端代理伺服器的地址。這個地址(address)可以是一個功能變數名稱或ip地址和埠,或者一個 unix-domain socket路徑。
            proxy_pass http://testfuzai;
            proxy_set_header Host $proxy_host;
        }
    }
}

    2)啟動redis設置好參數與數量

    3)啟動項目並分別配置不同埠(要與Nginx裡面的一致)

    4)進行壓測,通過jmeter的Thread Group裡面編輯好HTTP Request,設置參數 線程數 Number of Threads 【設置為200】 ,請求的重覆次數 Loop count 【設置為5】 ,Ramp-up period(seconds)線程啟動開始運行的時間間隔(單位是秒)【設置為1】。則,一秒內會有1000個請求打過去。

 

  1.不加鎖進行庫存扣減的情況:

    代碼示例

@RequestMapping("/deduct_stock")
public String deductStock() {
    //從redis取出庫存
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); 
    if (stock > 0) {
        int realStock = stock - 1;
        //往redis寫入庫存
        stringRedisTemplate.opsForValue().set("stock", realStock + ""); 
        System.out.println("扣減成功,剩餘庫存:" + realStock);
    } else {
        System.out.println("扣減失敗,庫存不足");
    }
    return "end";
}

 

    發現說明

      1)通過列印輸出,我們會發現兩台機器上會出現重覆的值(即出現了超賣現象)。甚至會出現另一臺伺服器的數據覆蓋本伺服器的數據。

      2)原因在於讀取數據和寫入數據存在時間差,如兩個伺服器Q1和Q1,Q1有請求,獲取庫存【假設300】,在庫存判斷大小之後進行扣減庫存如果慢了【假設需要3秒】,那麼Q2有5次請求,獲取到庫存,扣減完後設置,依次5次,則庫存為【295】。但是此時Q1完成自身請求又會把庫存設置為【299】。故不合理。所以應該改為使用stringRedisTemplate.boundValueOps("stock").increment(-1); 改為採用redis內部扣除,減少了超賣的個數。但是就算改了也只是避免了覆蓋問題,仍然沒有解決超賣問題。如果有6台伺服器,庫存剩下1個的時候六個請求同時進入到扣減庫存這一步,那麼就會出現超賣5個的現象(這也是超賣個數最多的現象)。

 

  2.採用SETNX的方式加分散式鎖的情況:

    代碼示例

public String deductStock() {
    String lockKey = "lock:product_101";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
if (!result) { return "error_code"; } try {
     int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock"); System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }

    發現說明

      1)這種方式明顯保證了在分散式情況下只有一個線程能夠執行業務代碼。但是我們不可能對於用戶買商品的時候返回錯誤提示,如果不斷自旋的話又容易讓CPU飆升。肯定要考慮休眠與喚醒,但可以在上層方法裡面處理。

      2)同時很明顯存在個問題,如果我在扣減庫存時候伺服器宕機了,庫存扣減還沒設置【且沒執行finally代碼,那麼我這個商品的鎖就不會被釋放,除非手動清除】。

那麼肯定需要設置超時時間。如

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

      會發現補一個超時時間的話依舊無法避免之前的問題,故加鎖和設置超時時間需要保持原子性

      3)採用原子操作:Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);基於設置了超時時間,那麼我們如何考量超時時間呢,業務執行多久我們根本不可得知。故容易出現時間到期了,業務還沒執行完。這就容易出現A持有鎖執行任務,還沒完成就超時了,B持有鎖執行任務,A執行完,釋放鎖【此時會釋放B的鎖】的情況。所以釋放鎖必須要持有鎖本人才能執行。

if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
    stringRedisTemplate.delete(lockKey);
}

      所以clientId需要是分散式ID,然後釋放鎖改為判斷clientId符合才能去釋放。

 

  3.改進之後的情況:

    代碼示例

public String deductStock() {
    String lockKey = "lock:product_101";
    String clientId = UUID.randomUUID().toString();
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
    if (!result) {
        return "error_code";
    }

    try {
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); 
 if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock");  System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end"; }

 

    發現說明

      1)即時加了判斷,我們會發現依舊會存在問題【因為判斷與釋放鎖操作不是原子性的】,如果在判斷裡面加上休眠進行試驗

if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
    Thread.sleep(20000);
    stringRedisTemplate.delete(lockKey);
}

      我們會發現根本問題依舊沒有解決,只是減少了發生的情況。究其原因,本質上還是鎖超時導致的。解決這個問題就要引入一個完美的解決方案叫做鎖續命

      2)鎖續命(watchDog):假設主線程搶到鎖開始執行業務邏輯,開啟一個分線程,在分線程裡邊做一個定時任務,比如說設置的鎖超時時間是30s,那麼我們的定時任務時間就設置為10s,定時任務設置的時間一定要比鎖超時時間小,每10s定時任務先去判斷主線程有沒有結束,沒有結束的話說明主線程就還在,還在進行業務邏輯操作,這個時候我們執行一條expire命令,將主線程鎖的超時時間重新設置為30s,這樣的話只要主線程還沒結束,主線程就會被分線程定時任務去做續命邏輯,維持在30s,判斷主線程結束,就不再執行續命邏輯。

 

Redisson分散式鎖框架剖析

  1.引入依賴

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

  2.進行配置

@Bean
public Redisson redisson() {
    // 此為單機模式
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
    return (Redisson) Redisson.create(config);
}

  3.業務代碼展示

public String deductStock2() {
    String lockKey = "lock:product_101";
    //獲取鎖對象
    RLock redissonLock = redisson.getLock(lockKey);
    //加分散式鎖
    redissonLock.lock();
    try {
        Long stock = (Long) stringRedisTemplate.opsForValue().decrement("stock");
        if (stock > 0) {
            Long realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + "");
            System.out.println("扣減成功,剩餘庫存:" + realStock);
        } else {
            System.out.println("扣減失敗,庫存不足");
        }
    } finally {
        //解鎖
        redissonLock.unlock();
    }
    return "end";
}

     發現說明

      1.如果在集群架構下麵,分散式鎖如果在Master節點上寫成功了就會返回給客戶端,但是此時還需要同步給從節點。

      2.如果在此時間內Master節點結點宕機,那麼數據將會消失,而從節點上沒有鎖的信息(變為Master節點)。【主從架構鎖失效問題

 

  4.為解決主從架構鎖失效問題引入的RedLock(不建議用,因為本質上還是沒有解決主從架構鎖失效問題)

    0.原理展示

           

 

    1.redssion 集群配置(在resource下創建 redssion.yml文件)

clusterServersConfig:
  # 連接空閑超時,單位:毫秒 預設10000
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  # 同任何節點建立連接時的等待超時。時間單位是毫秒 預設10000
  connectTimeout: 10000
  # 等待節點回覆命令的時間。該時間從命令發送成功時開始計時。預設3000
  timeout: 3000
  # 命令失敗重試次數
  retryAttempts: 3
  # 命令重試發送時間間隔,單位:毫秒
  retryInterval: 1500
  # 重新連接時間間隔,單位:毫秒
  reconnectionTimeout: 3000
  # 執行失敗最大次數
  failedAttempts: 3
  # 密碼
  password: test1234
  # 單個連接最大訂閱數量
  subscriptionsPerConnection: 5
  clientName: null
  # loadBalancer 負載均衡演算法類的選擇
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  #從節點發佈和訂閱連接的最小空閑連接數
  slaveSubscriptionConnectionMinimumIdleSize: 1
  #從節點發佈和訂閱連接池大小 預設值50
  slaveSubscriptionConnectionPoolSize: 50
  # 從節點最小空閑連接數 預設值32
  slaveConnectionMinimumIdleSize: 32
  # 從節點連接池大小 預設64
  slaveConnectionPoolSize: 64
  # 主節點最小空閑連接數 預設32
  masterConnectionMinimumIdleSize: 32
  # 主節點連接池大小 預設64
  masterConnectionPoolSize: 64
  # 訂閱操作的負載均衡模式
  subscriptionMode: SLAVE
  # 只在從伺服器讀取
  readMode: SLAVE
  # 集群地址
  nodeAddresses:
    - "redis://IP地址:30001"  //
    - "redis://IP地址:30002"
    - "redis://IP地址:30003"
    - "redis://IP地址:30004"
    - "redis://IP地址:30005"
    - "redis://IP地址:30006"
  # 對Redis集群節點狀態掃描的時間間隔。單位是毫秒。預設1000
  scanInterval: 1000
  #這個線程池數量被所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務共同共用。預設2
threads: 0
#這個線程池數量是在一個Redisson實例內,被其創建的所有分散式數據類型和服務,以及底層客戶端所一同共用的線程池裡保存的線程數量。預設2
nettyThreads: 0
# 編碼方式 預設org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#傳輸模式
transportMode: NIO
# 分散式鎖自動過期時間,防止死鎖,預設30000
lockWatchdogTimeout: 30000
# 通過該參數來修改是否按訂閱發佈消息的接收順序出來消息,如果選否將對消息實行並行處理,該參數只適用於訂閱發佈消息的情況, 預設true
keepPubSubOrder: true
# 用來指定高性能引擎的行為。由於該變數值的選用與使用場景息息相關(NORMAL除外)我們建議對每個參數值都進行嘗試。

 

    2.代碼配置

@Bean
public RedissonClient redisson() throws IOException {
    Config config = Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream());
    RedissonClient redisson = Redisson.create(config);
    return redisson;
} 
//或者
@Bean
public Redisson redisson() {
    // 此為集群模式
    Config config = new Config();
    config.useClusterServers()
            .addNodeAddress("redis://127.0.0.1:6379")
            .addNodeAddress("redis://127.0.0.1:6389")
            .addNodeAddress("redis://127.0.0.1:6399")
            .addNodeAddress("redis://127.0.0.1:6369");
    return (Redisson) Redisson.create(config);
}

 

    3.業務代碼示例

@RequestMapping("/redlock")
public String redlock() {
    RLock lock1 = redisson.getLock("Key1_product_001");
    RLock lock2 = redisson.getLock("Key2_product_001");
    RLock lock3 = redisson.getLock("Key3_product_001");

    /**
     * 根據多個 RLock 對象構建 RedissonRedLock (最核心的差別就在這裡)
     */
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    try {
        /**
         * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
         * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大於業務處理的時間,確保在鎖有效期內業務能處理完)
         */
        boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
        if (res) {
            //成功獲得鎖,在這裡處理業務
        }
    } catch (Exception e) {
        throw new RuntimeException("lock fail");
    } finally {
        //無論如何, 最後都要解鎖
        redLock.unlock();
    }
    return "end";
}

 

    4.分析說明(為什麼不推薦用

      1)如果不是集群,為保證高可用,要對三個節點都添加了從節點(因為如果沒有從節點,線上只要有兩個服務宕機了,那麼這個分散式鎖將不再可用)

      2)針對三主三從的情況,A線程對redis_1_主 和 redis_2_主 加鎖成功,對 redis_3_主 加鎖失敗,則可以獲得分散式鎖,執行任務。但是還沒同步情況下,redis_1_主宕機,redis_1_從 晉升成功數據丟失,此時B線程來加鎖,redis_1_從加鎖成功和 redis_3_主 加鎖成功,對 redis_2_主 加鎖失敗,也能獲得分散式鎖。【概率不大但還是會存在問題】

      3)針對集群如果不搞主從【一旦出現宕機,數據量大,且訪問高的話,這裡面就存在著緩存雪崩的危機】,此外如果集群半數節點宕機,集群會被迫停了,此外如果加鎖節點越多,加鎖效率越低下。

      4)既然原理與zookeeper的差不多而且也損失了高性能的特性,那其實還不如使用zookeeper分散式鎖。 

 

  5.原理分析 

          

 

  6.源碼剖析

    1)Redisson類#getLock方法

public RLock getLock(String name) {
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}

 

    2)Redisson類#lock方法

public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

//RedissonLock類#lockInterruptibly方法
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

//RedissonLock類#lockInterruptibly方法
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //先在redis中發佈訂閱消息,等待用完鎖的線程通知
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
       //再次嘗試獲取鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                break;
            }

            if (ttl >= 0) {
         //利用 Semaphore 信號量的方式獲得許可,但是這種休眠是定時的
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

//RedissonLock類#tryAcquire方法
//利用future的方式阻塞式等待返回結果
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

//RedissonObject類#get方法
protected <V> V get(RFuture<V> future) {
    return commandExecutor.get(future);
}

//RedissonLock類#subscribe方法
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

//PublishSubscribe類#subscribe方法
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
    final RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
            // 1:判斷RedisLockEntry 是否存在
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            // 2:創建RedisLockEntry
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            // 3:創建一個監聽器,別的線程進行redis-pub命令之後進行調用
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            // 4:底層交給netty調用redis-sub命令
            subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

 

    3)RedissonLock類#tryAcquireAsync方法(核心點主體)

//RedissonLock類#tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //嘗試加鎖邏輯
 RFuture<Long> ttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    //添加監聽器
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        //Future任務執行完會回調該方法
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // 加鎖成功
            if (ttlRemaining == null) {
                //看門狗續命
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

 

    4)RedissonLock類#tryLockInnerAsync方法(核心點,加鎖邏輯

//RedissonLock類#tryLockInnerAsync方法
//利用redis的單線程執行任務,redis會將整個腳本作為一個整體執行,且中間不會被其他命令插入
//採用的是hash的類型來存儲鎖,為了實現重入鎖的概念
//Redis pttl命令以毫秒為單位返回 key 的剩餘過期時間
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                //對應為KEYS[1](對應傳入的鎖的命名),ARGV[1](設置的超時時間,預設30s) ,ARGV[2] -》(uuid + ":" + threadId)
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

 

    5)RedissonLock類#scheduleExpirationRenewal方法(核心點,看門狗的邏輯【續命】

//RedissonLock類#scheduleExpirationRenewal方法
//採用Future+事件監聽的方式,方法嵌套調用來實現定時任務
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            //再次添加監聽器,重覆檢查
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself  //遞歸調用
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    //如果該任務已經存在一個了,就把新建的任務關閉,Map中的key為(uuid + ":" + threadId)
    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

 

    6)Redisson類#unlock方法

//RedissonLock類#unlock方法
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException(...);
    }
    if (opStatus) {
        //移除看門狗的定時任務
        cancelExpirationRenewal();
    }

}

//RedissonLock類#unlockInnerAsync方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //如果不存在鎖
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //當前線程並沒有持有鎖,則返回nil
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //前線程持有鎖,則對value-1,拿到-1之後的vlaue
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            //value>0,以毫秒為單位返回剩下的過期時間。(保證可重入)
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //value<=0,則對key進行刪除操作,return 1 (方法返回 true)。然後進行redis-pub指令,用於喚醒其他正在休眠的線程。
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            //參數順序KEYS[1](鎖的名稱),KEYS[2](發佈訂閱的Channel名:redisson_lock__channel+鎖名),ARGV[1](發佈的消息),ARGV[2](鎖超時時間),ARGV[3](uuid + ":" + threadId)
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

 

    7)Redisson類#tryLock方法

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            acquiredLocks.add(lock);
        } else {
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
        
        if (remainTime != -1) {
            remainTime -= (System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    
    return true;
}

 

 

Redis與Zookeeper分散式鎖的區別

  1.從單機角度上來說,兩者差別不大,都是項目引入的外部組件,redis相對於zookeeper來說,項目中使用的更多,常用性角度redis更加。

  2.但是一般我們都會做集群(容錯率更高):

    【1】從分散式的CAP角度分析:

        redis滿足AP,在Master節點上寫成功了會優先返回給客戶端,之後在同步給從節點

        zookeeper滿足CP,在Master節點上寫成功了會優先同步給從節點【ZAB協議(半數以上寫成功)】,之後在返回給客戶端

    【2】主從架構鎖失效問題:

        redis會出現,因為從節點變成主節點時,會出現丟失數據的問題。

        zookeeper不會出現,因為從節點變成主節點時,不會會出現丟失數據的問題。

    【3】集群下性能角度:

        redis性能會高於zookeeper,同步是個耗時的操作(而且這個過程中還是相當於阻塞線程),併發越高的情況,我們想要的是耗時越少的越好。

   3.選redis還是zk實現分散式鎖

    首先zk的性能肯定不如redis,但是從分散式鎖的角度語義上來說,zk可能更適合一些,所以如果對性能要求比較高的話就選redis,對數據的強一致性有特別嚴格要求的話就選zk,現在的主流的分散式鎖方案還是redis,也有一些辦法去減少redis主從架構鎖失效問題。

 

如何提升分散式鎖性能

  問題分析

  1.分散式鎖為我們解決了併發問題,但是其底層思路是將並行執行的請求給串列化了,因為redis是單線程執行任務的,肯定就不會有併發問題了。

  2.但是這種設計本身是與我們高併發的需求是衝突的。但是某些場景下我們又不得不用,所以我們應該基於場景做一些優化。

  3.正如阿裡巴巴Java開發手冊裡面寫到:

6. 【強制】高併發時,同步調用應該去考量鎖的性能損耗。能用無鎖數據結構,就不要用鎖;能鎖區塊,就不要鎖整個方法體;能用對象鎖,就不要用類鎖。

說明:儘可能使加鎖的代碼塊工作量儘可能的小,避免在鎖代碼塊中調用 RPC 方法。

7. 【強制】對多個資源、資料庫表、對象同時加鎖時,需要保持一致的加鎖順序,否則可能會造成死鎖。

說明:線程一需要對錶 A、B、C 依次全部加鎖後才可以進行更新操作,那麼線程二的加鎖順序也必須是 A、B、C,否則可能出現死鎖。


8. 【強制】併發修改同一記錄時,避免更新丟失,需要加鎖。要麼在應用層加鎖,要麼在緩存加鎖,要麼在資料庫層使用樂觀鎖,使用 version 作為更新依據。

說明:如果每次訪問衝突概率小於 20%,推薦使用樂觀鎖,否則使用悲觀鎖。樂觀鎖的重試次數不得小於 3 次。

 

  4.所以我們優先從鎖的粒度開始,鎖是否合適,加鎖的範圍是否

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ShadingSphere ​ShardingSphere是一款起源於噹噹網內部的應用框架,2015年在噹噹網內部誕生,2016年由主要開發人員張亮帶入京東數科,在國內經歷了噹噹網、電信翼支付、京東數科等多家大型互聯網企業的考驗,在2017年開源。 並逐漸由原本只關註於關係型資料庫增強工具的Shar ...
  • springboot自動配置原理以及手動實現配置類 1、原理 spring有一個思想是“約定大於配置”。 配置類自動配置可以幫助開發人員更加專註於業務邏輯開發,springboot在啟動的時候可以把一些配置類自動註入到spring的IOC容器里,項目運行之後就可以直接使用這些配置類的屬性和方法等。 ...
  • 簡述 類型:創建型 目的:實現對客戶端中對象的平替。 我們藉以下案例來說說如何使用工廠方法模式平替對象。 優化案例 最初版 public interface OS { public void start(); public void sleep(); public void restart(); p ...
  • Java基礎之變數 1.變數概述 1.1 為什麼需要變數 不論是使用哪種高級語言編寫程式,變數都是其程式的基本組成單位。變數有三個基本要素:類型、名稱、值。 class Test{ public static void main(String []args){ int a = 1;//定義一個變數, ...
  • time庫的使用:Python中內置了一些與時間處理相關的庫,如time、datatime和calendar庫。 其中time庫是Python中處理時間的標準庫,是最基礎的時間處理庫。 time庫的功能如下: (1)電腦時間的表達 (2)提供獲取系統時間並格式化輸出功能 (3)提供系統級精確計時功 ...
  • csv的簡單介紹 CSV (Comma Separated Values),即逗號分隔值(也稱字元分隔值,因為分隔符可以不是逗號),是一種常用的文本格式,用以存儲表格數據,包括數字或者字元。很多程式在處理數據時都會碰到csv這種格式的文件。python自帶了csv模塊,專門用於處理csv文件的讀取 ...
  • 1、任務介紹 需求分析 爬取豆瓣電影Top250的基本信息,包括電影的名稱,豆瓣評分,評價數,電影概況,電影鏈接等。 https://movie.douban.com/top250 2、基本流程 2.1、準備工作 通過瀏覽器查看分析目標網頁,學習編程基礎規範 與Java的一些區別,Python沒有主 ...
  • 2022-09-30 F對象: 在shell中是用於兩個有關聯的屬性之間的查詢。 使用實例: 查詢書籍表中閱讀量大於評論量的記錄 前提,進入pycharm,進入虛擬環境,進入shell環境。 首先,要使用F對象,那麼就需要導入F對象 from django.db.models import F 後進 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...