介紹了分散式鎖的特性,模擬想要實現redis分散式鎖的演變流程,分析redisson源碼是如何實現分散式鎖的,面對高併發下,我們該如何提升分散式鎖性能 ...
為什麼需要分散式鎖
1.為瞭解決Java共用記憶體模型帶來的線程安全問題,我們可以通過加鎖來保證資源訪問的單一,如JVM內置鎖synchronized,類級別的鎖ReentrantLock。
2.但是隨著業務的發展,單機服務畢竟存在著限制,故會往多台組合形成集群架構,面對集群架構,我們同樣存在則資源共用問題,而每台伺服器有著自己的JVM,這時候我們對於鎖的實現不得不考慮分散式的實現。
分散式鎖應該具備哪些條件
1.在分散式系統環境下,一個方法在同一時間只能被一個機器的一個線程執行
2.高可用的獲取鎖與釋放鎖
3.高性能的獲取鎖與釋放鎖
4.具備可重入特性(可理解為重新進入,由多於一個任務併發使用,而不必擔心數據錯誤)
5.具備鎖失效機制,即自動解鎖,防止死鎖
6.具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗
秒殺搶購場景模擬(模擬併發問題:其實就是指每一步如果存在間隔時間,那麼當某一線程間隔時間拉長,會對其餘線程造成什麼影響)
0.如果要在本機測試的話
1)配置Nginx實現負載均衡
http { upstream testfuzai { server 127.0.0.1:8080 weight=1; server 127.0.0.1:8090 weight=1; } server { listen 80; server_name localhost; location / { //proxy_pass:設置後端代理伺服器的地址。這個地址(address)可以是一個功能變數名稱或ip地址和埠,或者一個 unix-domain socket路徑。 proxy_pass http://testfuzai; proxy_set_header Host $proxy_host; } } }
2)啟動redis設置好參數與數量
3)啟動項目並分別配置不同埠(要與Nginx裡面的一致)
4)進行壓測,通過jmeter的Thread Group裡面編輯好HTTP Request,設置參數 線程數 Number of Threads 【設置為200】 ,請求的重覆次數 Loop count 【設置為5】 ,Ramp-up period(seconds)線程啟動開始運行的時間間隔(單位是秒)【設置為1】。則,一秒內會有1000個請求打過去。
1.不加鎖進行庫存扣減的情況:
代碼示例
@RequestMapping("/deduct_stock") public String deductStock() { //從redis取出庫存 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; //往redis寫入庫存 stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } return "end"; }
發現說明
1)通過列印輸出,我們會發現兩台機器上會出現重覆的值(即出現了超賣現象)。甚至會出現另一臺伺服器的數據覆蓋本伺服器的數據。
2)原因在於讀取數據和寫入數據存在時間差,如兩個伺服器Q1和Q1,Q1有請求,獲取庫存【假設300】,在庫存判斷大小之後進行扣減庫存如果慢了【假設需要3秒】,那麼Q2有5次請求,獲取到庫存,扣減完後設置,依次5次,則庫存為【295】。但是此時Q1完成自身請求又會把庫存設置為【299】。故不合理。所以應該改為使用stringRedisTemplate.boundValueOps("stock").increment(-1); 改為採用redis內部扣除,減少了超賣的個數。但是就算改了也只是避免了覆蓋問題,仍然沒有解決超賣問題。如果有6台伺服器,庫存剩下1個的時候六個請求同時進入到扣減庫存這一步,那麼就會出現超賣5個的現象(這也是超賣個數最多的現象)。
2.採用SETNX的方式加分散式鎖的情況:
代碼示例
public String deductStock() { String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
if (!result) { return "error_code"; } try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock"); System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }
發現說明
1)這種方式明顯保證了在分散式情況下只有一個線程能夠執行業務代碼。但是我們不可能對於用戶買商品的時候返回錯誤提示,如果不斷自旋的話又容易讓CPU飆升。肯定要考慮休眠與喚醒,但可以在上層方法裡面處理。
2)同時很明顯存在個問題,如果我在扣減庫存時候伺服器宕機了,庫存扣減還沒設置【且沒執行finally代碼,那麼我這個商品的鎖就不會被釋放,除非手動清除】。
那麼肯定需要設置超時時間。如
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
會發現補一個超時時間的話依舊無法避免之前的問題,故加鎖和設置超時時間需要保持原子性。
3)採用原子操作:Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);基於設置了超時時間,那麼我們如何考量超時時間呢,業務執行多久我們根本不可得知。故容易出現時間到期了,業務還沒執行完。這就容易出現A持有鎖執行任務,還沒完成就超時了,B持有鎖執行任務,A執行完,釋放鎖【此時會釋放B的鎖】的情況。所以釋放鎖必須要持有鎖本人才能執行。
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); }
所以clientId需要是分散式ID,然後釋放鎖改為判斷clientId符合才能去釋放。
3.改進之後的情況:
代碼示例
public String deductStock() { String lockKey = "lock:product_101"; String clientId = UUID.randomUUID().toString(); Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); if (!result) { return "error_code"; } try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) { Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock"); System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end"; }
發現說明
1)即時加了判斷,我們會發現依舊會存在問題【因為判斷與釋放鎖操作不是原子性的】,如果在判斷裡面加上休眠進行試驗
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { Thread.sleep(20000); stringRedisTemplate.delete(lockKey); }
我們會發現根本問題依舊沒有解決,只是減少了發生的情況。究其原因,本質上還是鎖超時導致的。解決這個問題就要引入一個完美的解決方案叫做鎖續命。
2)鎖續命(watchDog):假設主線程搶到鎖開始執行業務邏輯,開啟一個分線程,在分線程裡邊做一個定時任務,比如說設置的鎖超時時間是30s,那麼我們的定時任務時間就設置為10s,定時任務設置的時間一定要比鎖超時時間小,每10s定時任務先去判斷主線程有沒有結束,沒有結束的話說明主線程就還在,還在進行業務邏輯操作,這個時候我們執行一條expire命令,將主線程鎖的超時時間重新設置為30s,這樣的話只要主線程還沒結束,主線程就會被分線程定時任務去做續命邏輯,維持在30s,判斷主線程結束,就不再執行續命邏輯。
Redisson分散式鎖框架剖析
1.引入依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.5</version> </dependency>
2.進行配置
@Bean public Redisson redisson() { // 此為單機模式 Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0); return (Redisson) Redisson.create(config); }
3.業務代碼展示
public String deductStock2() { String lockKey = "lock:product_101"; //獲取鎖對象 RLock redissonLock = redisson.getLock(lockKey); //加分散式鎖 redissonLock.lock(); try { Long stock = (Long) stringRedisTemplate.opsForValue().decrement("stock"); if (stock > 0) { Long realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣減成功,剩餘庫存:" + realStock); } else { System.out.println("扣減失敗,庫存不足"); } } finally { //解鎖 redissonLock.unlock(); } return "end"; }
發現說明
1.如果在集群架構下麵,分散式鎖如果在Master節點上寫成功了就會返回給客戶端,但是此時還需要同步給從節點。
2.如果在此時間內Master節點結點宕機,那麼數據將會消失,而從節點上沒有鎖的信息(變為Master節點)。【主從架構鎖失效問題】
4.為解決主從架構鎖失效問題引入的RedLock(不建議用,因為本質上還是沒有解決主從架構鎖失效問題)
0.原理展示
1.redssion 集群配置(在resource下創建 redssion.yml文件)
clusterServersConfig: # 連接空閑超時,單位:毫秒 預設10000 idleConnectionTimeout: 10000 pingTimeout: 1000 # 同任何節點建立連接時的等待超時。時間單位是毫秒 預設10000 connectTimeout: 10000 # 等待節點回覆命令的時間。該時間從命令發送成功時開始計時。預設3000 timeout: 3000 # 命令失敗重試次數 retryAttempts: 3 # 命令重試發送時間間隔,單位:毫秒 retryInterval: 1500 # 重新連接時間間隔,單位:毫秒 reconnectionTimeout: 3000 # 執行失敗最大次數 failedAttempts: 3 # 密碼 password: test1234 # 單個連接最大訂閱數量 subscriptionsPerConnection: 5 clientName: null # loadBalancer 負載均衡演算法類的選擇 loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} #從節點發佈和訂閱連接的最小空閑連接數 slaveSubscriptionConnectionMinimumIdleSize: 1 #從節點發佈和訂閱連接池大小 預設值50 slaveSubscriptionConnectionPoolSize: 50 # 從節點最小空閑連接數 預設值32 slaveConnectionMinimumIdleSize: 32 # 從節點連接池大小 預設64 slaveConnectionPoolSize: 64 # 主節點最小空閑連接數 預設32 masterConnectionMinimumIdleSize: 32 # 主節點連接池大小 預設64 masterConnectionPoolSize: 64 # 訂閱操作的負載均衡模式 subscriptionMode: SLAVE # 只在從伺服器讀取 readMode: SLAVE # 集群地址 nodeAddresses: - "redis://IP地址:30001" // - "redis://IP地址:30002" - "redis://IP地址:30003" - "redis://IP地址:30004" - "redis://IP地址:30005" - "redis://IP地址:30006" # 對Redis集群節點狀態掃描的時間間隔。單位是毫秒。預設1000 scanInterval: 1000 #這個線程池數量被所有RTopic對象監聽器,RRemoteService調用者和RExecutorService任務共同共用。預設2 threads: 0 #這個線程池數量是在一個Redisson實例內,被其創建的所有分散式數據類型和服務,以及底層客戶端所一同共用的線程池裡保存的線程數量。預設2 nettyThreads: 0 # 編碼方式 預設org.redisson.codec.JsonJacksonCodec codec: !<org.redisson.codec.JsonJacksonCodec> {} #傳輸模式 transportMode: NIO # 分散式鎖自動過期時間,防止死鎖,預設30000 lockWatchdogTimeout: 30000 # 通過該參數來修改是否按訂閱發佈消息的接收順序出來消息,如果選否將對消息實行並行處理,該參數只適用於訂閱發佈消息的情況, 預設true keepPubSubOrder: true # 用來指定高性能引擎的行為。由於該變數值的選用與使用場景息息相關(NORMAL除外)我們建議對每個參數值都進行嘗試。
2.代碼配置
@Bean public RedissonClient redisson() throws IOException { Config config = Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()); RedissonClient redisson = Redisson.create(config); return redisson; } //或者 @Bean public Redisson redisson() { // 此為集群模式 Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://127.0.0.1:6379") .addNodeAddress("redis://127.0.0.1:6389") .addNodeAddress("redis://127.0.0.1:6399") .addNodeAddress("redis://127.0.0.1:6369"); return (Redisson) Redisson.create(config); }
3.業務代碼示例
@RequestMapping("/redlock") public String redlock() { RLock lock1 = redisson.getLock("Key1_product_001"); RLock lock2 = redisson.getLock("Key2_product_001"); RLock lock3 = redisson.getLock("Key3_product_001"); /** * 根據多個 RLock 對象構建 RedissonRedLock (最核心的差別就在這裡) */ RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3); try { /** * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗 * leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大於業務處理的時間,確保在鎖有效期內業務能處理完) */ boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS); if (res) { //成功獲得鎖,在這裡處理業務 } } catch (Exception e) { throw new RuntimeException("lock fail"); } finally { //無論如何, 最後都要解鎖 redLock.unlock(); } return "end"; }
4.分析說明(為什麼不推薦用)
1)如果不是集群,為保證高可用,要對三個節點都添加了從節點(因為如果沒有從節點,線上只要有兩個服務宕機了,那麼這個分散式鎖將不再可用)
2)針對三主三從的情況,A線程對redis_1_主 和 redis_2_主 加鎖成功,對 redis_3_主 加鎖失敗,則可以獲得分散式鎖,執行任務。但是還沒同步情況下,redis_1_主宕機,redis_1_從 晉升成功數據丟失,此時B線程來加鎖,redis_1_從加鎖成功和 redis_3_主 加鎖成功,對 redis_2_主 加鎖失敗,也能獲得分散式鎖。【概率不大但還是會存在問題】
3)針對集群如果不搞主從【一旦出現宕機,數據量大,且訪問高的話,這裡面就存在著緩存雪崩的危機】,此外如果集群半數節點宕機,集群會被迫停了,此外如果加鎖節點越多,加鎖效率越低下。
4)既然原理與zookeeper的差不多而且也損失了高性能的特性,那其實還不如使用zookeeper分散式鎖。
5.原理分析
6.源碼剖析
1)Redisson類#getLock方法
public RLock getLock(String name) { return new RedissonLock(this.connectionManager.getCommandExecutor(), name); } public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); }
2)Redisson類#lock方法
public void lock() { try { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } //RedissonLock類#lockInterruptibly方法 public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); } //RedissonLock類#lockInterruptibly方法 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } //先在redis中發佈訂閱消息,等待用完鎖的線程通知 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { //再次嘗試獲取鎖 ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null) { break; } if (ttl >= 0) { //利用 Semaphore 信號量的方式獲得許可,但是這種休眠是定時的 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } } //RedissonLock類#tryAcquire方法 //利用future的方式阻塞式等待返回結果 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } //RedissonObject類#get方法 protected <V> V get(RFuture<V> future) { return commandExecutor.get(future); } //RedissonLock類#subscribe方法 protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } //PublishSubscribe類#subscribe方法 public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) { final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); final RPromise<E> newPromise = new RedissonPromise<E>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { // 1:判斷RedisLockEntry 是否存在 E entry = entries.get(entryName); if (entry != null) { entry.aquire(); semaphore.release(); entry.getPromise().addListener(new TransferListener<E>(newPromise)); return; } // 2:創建RedisLockEntry E value = createEntry(newPromise); value.aquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.aquire(); semaphore.release(); oldValue.getPromise().addListener(new TransferListener<E>(newPromise)); return; } // 3:創建一個監聽器,別的線程進行redis-pub命令之後進行調用 RedisPubSubListener<Object> listener = createListener(channelName, value); // 4:底層交給netty調用redis-sub命令 subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; semaphore.acquire(listener); listenerHolder.set(listener); return newPromise; }
3)RedissonLock類#tryAcquireAsync方法(核心點主體)
//RedissonLock類#tryAcquireAsync方法 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //嘗試加鎖邏輯 RFuture<Long> ttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //添加監聽器 ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override //Future任務執行完會回調該方法 public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // 加鎖成功 if (ttlRemaining == null) { //看門狗續命 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
4)RedissonLock類#tryLockInnerAsync方法(核心點,加鎖邏輯)
//RedissonLock類#tryLockInnerAsync方法 //利用redis的單線程執行任務,redis會將整個腳本作為一個整體執行,且中間不會被其他命令插入 //採用的是hash的類型來存儲鎖,為了實現重入鎖的概念 //Redis pttl命令以毫秒為單位返回 key 的剩餘過期時間 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", //對應為KEYS[1](對應傳入的鎖的命名),ARGV[1](設置的超時時間,預設30s) ,ARGV[2] -》(uuid + ":" + threadId) Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
5)RedissonLock類#scheduleExpirationRenewal方法(核心點,看門狗的邏輯【續命】)
//RedissonLock類#scheduleExpirationRenewal方法 //採用Future+事件監聽的方式,方法嵌套調用來實現定時任務 private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); //再次添加監聽器,重覆檢查 future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } if (future.getNow()) { // reschedule itself //遞歸調用 scheduleExpirationRenewal(threadId); } } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //如果該任務已經存在一個了,就把新建的任務關閉,Map中的key為(uuid + ":" + threadId) if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); } }
6)Redisson類#unlock方法
//RedissonLock類#unlock方法 public void unlock() { Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException(...); } if (opStatus) { //移除看門狗的定時任務 cancelExpirationRenewal(); } } //RedissonLock類#unlockInnerAsync方法 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //如果不存在鎖 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + //當前線程並沒有持有鎖,則返回nil "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + //前線程持有鎖,則對value-1,拿到-1之後的vlaue "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //value>0,以毫秒為單位返回剩下的過期時間。(保證可重入) "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + //value<=0,則對key進行刪除操作,return 1 (方法返回 true)。然後進行redis-pub指令,用於喚醒其他正在休眠的線程。 "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", //參數順序KEYS[1](鎖的名稱),KEYS[2](發佈訂閱的Channel名:redisson_lock__channel+鎖名),ARGV[1](發佈的消息),ARGV[2](鎖超時時間),ARGV[3](uuid + ":" + threadId) Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
7)Redisson類#tryLock方法
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { newLeaseTime = unit.toMillis(waitTime)*2; } long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime); int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size()); for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (Exception e) { lockAcquired = false; } if (lockAcquired) { acquiredLocks.add(lock); } else { if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } } if (remainTime != -1) { remainTime -= (System.currentTimeMillis() - time); time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } return true; }
Redis與Zookeeper分散式鎖的區別
1.從單機角度上來說,兩者差別不大,都是項目引入的外部組件,redis相對於zookeeper來說,項目中使用的更多,常用性角度redis更加。
2.但是一般我們都會做集群(容錯率更高):
【1】從分散式的CAP角度分析:
redis滿足AP,在Master節點上寫成功了會優先返回給客戶端,之後在同步給從節點
zookeeper滿足CP,在Master節點上寫成功了會優先同步給從節點【ZAB協議(半數以上寫成功)】,之後在返回給客戶端
【2】主從架構鎖失效問題:
redis會出現,因為從節點變成主節點時,會出現丟失數據的問題。
zookeeper不會出現,因為從節點變成主節點時,不會會出現丟失數據的問題。
【3】集群下性能角度:
redis性能會高於zookeeper,同步是個耗時的操作(而且這個過程中還是相當於阻塞線程),併發越高的情況,我們想要的是耗時越少的越好。
3.選redis還是zk實現分散式鎖:
首先zk的性能肯定不如redis,但是從分散式鎖的角度語義上來說,zk可能更適合一些,所以如果對性能要求比較高的話就選redis,對數據的強一致性有特別嚴格要求的話就選zk,現在的主流的分散式鎖方案還是redis,也有一些辦法去減少redis主從架構鎖失效問題。
如何提升分散式鎖性能
問題分析
1.分散式鎖為我們解決了併發問題,但是其底層思路是將並行執行的請求給串列化了,因為redis是單線程執行任務的,肯定就不會有併發問題了。
2.但是這種設計本身是與我們高併發的需求是衝突的。但是某些場景下我們又不得不用,所以我們應該基於場景做一些優化。
3.正如阿裡巴巴Java開發手冊裡面寫到:
6. 【強制】高併發時,同步調用應該去考量鎖的性能損耗。能用無鎖數據結構,就不要用鎖;能鎖區塊,就不要鎖整個方法體;能用對象鎖,就不要用類鎖。 說明:儘可能使加鎖的代碼塊工作量儘可能的小,避免在鎖代碼塊中調用 RPC 方法。 7. 【強制】對多個資源、資料庫表、對象同時加鎖時,需要保持一致的加鎖順序,否則可能會造成死鎖。 說明:線程一需要對錶 A、B、C 依次全部加鎖後才可以進行更新操作,那麼線程二的加鎖順序也必須是 A、B、C,否則可能出現死鎖。 8. 【強制】併發修改同一記錄時,避免更新丟失,需要加鎖。要麼在應用層加鎖,要麼在緩存加鎖,要麼在資料庫層使用樂觀鎖,使用 version 作為更新依據。 說明:如果每次訪問衝突概率小於 20%,推薦使用樂觀鎖,否則使用悲觀鎖。樂觀鎖的重試次數不得小於 3 次。
4.所以我們優先從鎖的粒度開始,鎖是否合適,加鎖的範圍是否