概述 單機架構下,一個進程中的多個線程競爭同一共用資源時,通常使用 JVM 級別的鎖即可保證互斥,以對商品下單並扣庫存為例: public String deductStock() { synchronized (this){ // 獲取庫存值 int stock = Integer.parseIn ...
概述
單機架構下,一個進程中的多個線程競爭同一共用資源時,通常使用 JVM 級別的鎖即可保證互斥,以對商品下單並扣庫存為例:
public String deductStock() {
synchronized (this){
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "")
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
}
return "end";
}
然而,當使用分散式架構時,這種方式就不管用了,因為 JVM 鎖只能控制自家應用,其他機器的應用時管不了的,這時候分散式鎖就派上用場了,它能保證分散式系統下不同進程對共用資源訪問的互斥性
案例分析
下麵對使用 Redis 實現分散式鎖的案例進行分析:
1. Case1
使用 Redis 中的 setnx()
設計一個入門級別的分散式鎖
public String deductStock1() {
String localKey = "lock:product:0001";
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
// 即使中間的任何一處邏輯拋出異常,也能保證鎖釋放
stringRedisTemplate.delete(localKey);
}
return "end";
}
存在的問題:鎖沒有釋放,機器卻宕機了,這時候其他機器將無法獲取鎖
2. Case2
設置一個過期時間,解決 Case1 中存在的宕機沒有釋放鎖的問題
public String deductStock2() {
String localKey = "lock:product:0001";
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true");
stringRedisTemplate.expire(localKey,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
stringRedisTemplate.delete(localKey);
}
return "end";
}
存在的問題:有可能還沒有執行到 expire()
就宕機了,沒有保證原子性
3. Case3
在加鎖時就設置超時時間,保證加鎖和設置超時時間是原子操作
public String deductStock3() {
String localKey = "lock:product:0001";
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, "true",10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
stringRedisTemplate.delete(localKey);
}
return "end";
}
存在問題:如果系統併發量不是特別的大,那麼問題不大,但如果併發量很大,就會出現嚴重的併發問題:
- 假設線程 A 的時間超過了超時時間,鎖失效了,此時該線程 A 還沒有執行 delete 方法
- 線程 B 這時候加鎖成功了,與此同時線程 A 執行了 delete 方法,但是這時候線程 A 釋放的鎖是線程 B 的
- 於是極端情況下就會出現:線程 A 釋放線程 B 的鎖,B 釋放 C 的,C 釋放 D 的 ......
4. Case4
Case3 存在的問題的根本原因就是在執行 delete 方法的時候,自己的鎖被其他的線程釋放了,所以解決辦法就是給每個線程生成一個唯一 ID,在最後釋放鎖的時候判斷是否是自己的鎖,如果是自己的才釋放
public String deductStock4() {
String localKey = "lock:product:0001";
String uuid = UUID.randomUUID().toString();
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
stringRedisTemplate.delete(localKey);
}
}
return "end";
}
存在問題:存在原子性問題,問題代碼如下:
if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
stringRedisTemplate.delete(localKey);
}
有可能出現當前線程執行完 if 判斷卻還沒執行 delete 操作的時候當前鎖過期了,於是又會出現當前線程釋放了其他線程的鎖的情況
5. Case5
對於 Case4 的問題,本質是 「判斷是不是當前線程加的鎖」和「釋放鎖」不是一個原子操作,可以用 Lua 腳本代替,Redis 會將整個腳本作為一個整體執行
String redisScript = "
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end;"
public String deductStock5() {
String localKey = "lock:product:0001";
String uuid = UUID.randomUUID().toString();
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
redisTemplate.execute(redisScript, Arrays.asList(localKey), uuid);
}
return "end";
}
也可以使用鎖續命的方式解決,即創建一個守護線程,每過一段時間,判斷業務的主線程有沒有結束(是否還加著鎖),如果還加著鎖,將鎖的超時時間重新設置
public String deductStock5() {
String localKey = "lock:product:0001";
String uuid = UUID.randomUUID().toString();
// 這條命令能夠保證原子性
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(localKey, uuid,10,TimeUnit.SECONDS);
if (!aBoolean){
return "當前系統繁忙";
} else {
// 續命
Thread demo = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Boolean expire = redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
// 有可能已經主動刪除key,不需要在續命
if(!expire){
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
demo.setDaemon(true);
demo.start();
}
try {
// 獲取庫存值
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣減成功,剩餘庫存:" + realStock);
} else {
System.out.println("扣減失敗,庫存不足");
}
} finally {
if (uuid.equals(stringRedisTemplate.opsForValue().get(localKey))){
stringRedisTemplate.delete(localKey);
}
}
return "end";
}