1.分散式鎖介紹 在電腦系統中,鎖作為一種控制併發的機制無處不在。 單機環境下,操作系統能夠在進程或線程之間通過本地的鎖來控制併發程式的行為。而在如今的大型複雜系統中,通常採用的是分散式架構提供服務。 分散式環境下,基於本地單機的鎖無法控制分散式系統中分開部署客戶端的併發行為,此時分散式鎖就應運而 ...
1.分散式鎖介紹
在電腦系統中,鎖作為一種控制併發的機制無處不在。
單機環境下,操作系統能夠在進程或線程之間通過本地的鎖來控制併發程式的行為。而在如今的大型複雜系統中,通常採用的是分散式架構提供服務。
分散式環境下,基於本地單機的鎖無法控制分散式系統中分開部署客戶端的併發行為,此時分散式鎖就應運而生了。
一個可靠的分散式鎖應該具備以下特性:
1.互斥性:作為鎖,需要保證任何時刻只能有一個客戶端(用戶)持有鎖
2.可重入: 同一個客戶端在獲得鎖後,可以再次進行加鎖
3.高可用:獲取鎖和釋放鎖的效率較高,不會出現單點故障
4.自動重試機制:當客戶端加鎖失敗時,能夠提供一種機制讓客戶端自動重試
2.分散式鎖api介面
/** * 分散式鎖 api介面 */ public interface DistributeLock { /** * 嘗試加鎖 * @param lockKey 鎖的key * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過期時間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey, int expireTime); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey, String requestID); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @param expireTime 過期時間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lock(String lockKey, String requestID, int expireTime); /** * 嘗試加鎖,失敗自動重試 會阻塞當前線程 * @param lockKey 鎖的key * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey); /** * 嘗試加鎖,失敗自動重試 會阻塞當前線程 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, String requestID); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過期時間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, int expireTime); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過期時間 單位:秒 * @param retryCount 重試次數 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, int expireTime, int retryCount); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param requestID 用戶ID * @param expireTime 過期時間 單位:秒 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, String requestID, int expireTime); /** * 嘗試加鎖 (requestID相等 可重入) * @param lockKey 鎖的key * @param expireTime 過期時間 單位:秒 * @param requestID 用戶ID * @param retryCount 重試次數 * @return 加鎖成功 返回uuid * 加鎖失敗 返回null * */ String lockAndRetry(String lockKey, String requestID, int expireTime, int retryCount); /** * 釋放鎖 * @param lockKey 鎖的key * @param requestID 用戶ID * @return true 釋放自己所持有的鎖 成功 * false 釋放自己所持有的鎖 失敗 * */ boolean unLock(String lockKey, String requestID); }
3.基於redis的分散式鎖的簡單實現
3.1 基礎代碼
當前實現版本的分散式鎖基於redis實現,使用的是jedis連接池來和redis進行交互,並將其封裝為redisClient工具類(僅封裝了demo所需的少數介面)
redisClient工具類:
public class RedisClient { private static final Logger LOGGER = LoggerFactory.getLogger(RedisClient.class); private JedisPool pool; private static RedisClient instance = new RedisClient(); private RedisClient() { init(); } public static RedisClient getInstance(){ return instance; } public Object eval(String script, List<String> keys, List<String> args) { Jedis jedis = getJedis(); Object result = jedis.eval(script, keys, args); jedis.close(); return result; } public String get(final String key){ Jedis jedis = getJedis(); String result = jedis.get(key); jedis.close(); return result; } public String set(final String key, final String value, final String nxxx, final String expx, final int time) { Jedis jedis = getJedis(); String result = jedis.set(key, value, nxxx, expx, time); jedis.close(); return result; } private void init(){ Properties redisConfig = PropsUtil.loadProps("redis.properties"); int maxTotal = PropsUtil.getInt(redisConfig,"maxTotal",10); String ip = PropsUtil.getString(redisConfig,"ip","127.0.0.1"); int port = PropsUtil.getInt(redisConfig,"port",6379); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(maxTotal); pool = new JedisPool(jedisPoolConfig, ip,port); LOGGER.info("連接池初始化成功 ip={}, port={}, maxTotal={}",ip,port,maxTotal); } private Jedis getJedis(){ return pool.getResource(); } }View Code
所依賴的工具類:
package util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @Author xiongyx * @Create 2018/4/11. */ public final class PropsUtil { private static final Logger LOGGER = LoggerFactory.getLogger(PropsUtil.class); /** * 讀取配置文件 * */ public static Properties loadProps(String fileName){ Properties props = null; InputStream is = null; try{ //:::絕對路徑獲得輸入流 is = Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName); if(is == null){ //:::沒找到文件,拋出異常 throw new FileNotFoundException(fileName + " is not found"); } props = new Properties(); props.load(is); }catch(IOException e){ LOGGER.error("load propertis file fail",e); }finally { if(is != null){ try{ //:::關閉io流 is.close(); } catch (IOException e) { LOGGER.error("close input Stream fail",e); } } } return props; } /** * 獲取字元串屬性(預設為空字元串) * */ public static String getString(Properties properties,String key){ //:::調用重載函數 預設值為:空字元串 return getString(properties,key,""); } /** * 獲取字元串屬性 * */ public static String getString(Properties properties,String key,String defaultValue){ //:::key對應的value數據是否存在 if(properties.containsKey(key)){ return properties.getProperty(key); }else{ return defaultValue; } } /** * 獲取int屬性 預設值為0 * */ public static int getInt(Properties properties,String key){ //:::調用重載函數,預設為:0 return getInt(properties,key,0); } /** * 獲取int屬性 * */ public static int getInt(Properties properties,String key,int defaultValue){ //:::key對應的value數據是否存在 if(properties.containsKey(key)){ return CastUtil.castToInt(properties.getProperty(key)); }else{ return defaultValue; } } /** * 獲取boolean屬性,預設值為false */ public static boolean getBoolean(Properties properties,String key){ return getBoolean(properties,key,false); } /** * 獲取boolean屬性 */ public static boolean getBoolean(Properties properties,String key,boolean defaultValue){ //:::key對應的value數據是否存在 if(properties.containsKey(key)){ return CastUtil.castToBoolean(properties.getProperty(key)); }else{ return defaultValue; } } } public final class CastUtil { /** * 轉為 string * */ public static String castToString(Object obj){ return castToString(obj,""); } /** * 轉為 string 提供預設值 * */ public static String castToString(Object obj,String defaultValue){ if(obj == null){ return defaultValue; }else{ return obj.toString(); } } /** * 轉為 int * */ public static int castToInt(Object obj){ return castToInt(obj,0); } /** * 轉為 int 提供預設值 * */ public static int castToInt(Object obj,int defaultValue){ if(obj == null){ return defaultValue; }else{ return Integer.parseInt(obj.toString()); } } /** * 轉為 double * */ public static double castToDouble(Object obj){ return castToDouble(obj,0); } /** * 轉為 double 提供預設值 * */ public static double castToDouble(Object obj,double defaultValue){ if(obj == null){ return defaultValue; }else{ return Double.parseDouble(obj.toString()); } } /** * 轉為 long * */ public static long castToLong(Object obj){ return castToLong(obj,0); } /** * 轉為 long 提供預設值 * */ public static long castToLong(Object obj,long defaultValue){ if(obj == null){ return defaultValue; }else{ return Long.parseLong(obj.toString()); } } /** * 轉為 boolean * */ public static boolean castToBoolean(Object obj){ return castToBoolean(obj,false); } /** * 轉為 boolean 提供預設值 * */ public static boolean castToBoolean(Object obj,boolean defaultValue){ if(obj == null){ return defaultValue; }else{ return Boolean.parseBoolean(obj.toString()); } } }View Code
初始化lua腳本 LuaScript.java:
在分散式鎖初始化時,使用init方法讀取lua腳本
public class LuaScript { /** * 加鎖腳本 lock.lua * */ public static String LOCK_SCRIPT = ""; /** * 解鎖腳本 unlock.lua * */ public static String UN_LOCK_SCRIPT = ""; public static void init(){ try { initLockScript(); initUnLockScript(); } catch (IOException e) { throw new RuntimeException(e); } } private static void initLockScript() throws IOException { String filePath = Objects.requireNonNull(LuaScript.class.getClassLoader().getResource("lock.lua")).getPath(); LOCK_SCRIPT = readFile(filePath); } private static void initUnLockScript() throws IOException { String filePath = Objects.requireNonNull(LuaScript.class.getClassLoader().getResource("unlock.lua")).getPath(); UN_LOCK_SCRIPT = readFile(filePath); } private static String readFile(String filePath) throws IOException { try ( FileReader reader = new FileReader(filePath); BufferedReader br = new BufferedReader(reader) ) { String line; StringBuilder stringBuilder = new StringBuilder(); while ((line = br.readLine()) != null) { stringBuilder.append(line).append(System.lineSeparator()); } return stringBuilder.toString(); } } }View Code
單例的RedisDistributeLock基礎屬性
public final class RedisDistributeLock implements DistributeLock { /** * 無限重試 * */ public static final int UN_LIMIT_RETRY = -1; private RedisDistributeLock() { LuaScript.init(); } private static DistributeLock instance = new RedisDistributeLock(); /** * 持有鎖 成功標識 * */ private static final Long ADD_LOCK_SUCCESS = 1L; /** * 釋放鎖 失敗標識 * */ private static final Integer RELEASE_LOCK_SUCCESS = 1; /** * 預設過期時間 單位:秒 * */ private static final int DEFAULT_EXPIRE_TIME_SECOND = 300; /** * 預設加鎖重試時間 單位:毫秒 * */ private static final int DEFAULT_RETRY_FIXED_TIME = 3000; /** * 預設的加鎖浮動時間區間 單位:毫秒 * */ private static final int DEFAULT_RETRY_TIME_RANGE = 1000; /** * 預設的加鎖重試次數 * */ private static final int DEFAULT_RETRY_COUNT = 30; /** * lockCount Key首碼 * */ private static final String LOCK_COUNT_KEY_PREFIX = "lock_count:"; public static DistributeLock getInstance(){ return instance; } }
3.2 加鎖實現
使用redis實現分散式鎖時,加鎖操作必須是原子操作,否則多客戶端併發操作時會導致各種各樣的問題。詳情請見:Redis分散式鎖的正確實現方式。
由於我們實現的是可重入鎖,加鎖過程中需要判斷客戶端ID的正確與否。而redis原生的簡單介面沒法保證一系列邏輯的原子性執行,因此採用了lua腳本來實現加鎖操作。lua腳本可以讓redis在執行時將一連串的操作以原子化的方式執行。
加鎖lua腳本 lock.lua
-- 獲取參數 local requestIDKey = KEYS[1] local lockCountKey = KEYS[2] local currentRequestID = ARGV[1] local expireTimeTTL = ARGV[2] -- setnx 嘗試加鎖 local lockSet = redis.call('setnx',requestIDKey,currentRequestID) if lockSet == 1 then -- 加鎖成功 設置過期時間和重入次數 redis.call('expire',requestIDKey,expireTimeTTL) redis.call('set',lockCountKey,1) redis.call('expire',lockCountKey,expireTimeTTL) return 1 else -- 判斷是否是重入加鎖 local oldRequestID = redis.call('get',requestIDKey) if currentRequestID == oldRequestID then -- 是重入加鎖 redis.call('incr',lockCountKey) -- 重置過期時間 redis.call('expire',requestIDKey,expireTimeTTL) redis.call('expire',lockCountKey,expireTimeTTL) return 1 else -- requestID不一致,加鎖失敗 return 0 end end
加鎖方法實現:
加鎖時,通過判斷eval的返回值來判斷加鎖是否成功。
@Override public String lock(String lockKey) { String uuid = UUID.randomUUID().toString(); return lock(lockKey,uuid); } @Override public String lock(String lockKey, int expireTime) { String uuid = UUID.randomUUID().toString(); return lock(lockKey,uuid,expireTime); } @Override public String lock(String lockKey, String requestID) { return lock(lockKey,requestID,DEFAULT_EXPIRE_TIME_SECOND); } @Override public String lock(String lockKey, String requestID, int expireTime) { RedisClient redisClient = RedisClient.getInstance(); List<String> keyList = Arrays.asList( lockKey, LOCK_COUNT_KEY_PREFIX + lockKey ); List<String> argsList = Arrays.asList( requestID, expireTime + "" ); Long result = (Long)redisClient.eval(LuaScript.LOCK_SCRIPT, keyList, argsList); if(result.equals(ADD_LOCK_SUCCESS)){ return requestID; }else{ return null; } }
3.3 解鎖實現
解鎖操作同樣需要一連串的操作,由於原子化操作的需求,因此同樣使用lua腳本實現解鎖功能。
解鎖lua腳本 unlock.lua
-- 獲取參數 local requestIDKey = KEYS[1] local lockCountKey = KEYS[2] local currentRequestID = ARGV[1] -- 判斷requestID一致性 if redis.call('get', requestIDKey) == currentRequestID then -- requestID相同,重入次數自減 local currentCount = redis.call('decr',lockCountKey) if currentCount == 0 then -- 重入次數為0,刪除鎖 redis.call('del',requestIDKey) redis.call('del',lockCountKey) return 1 else return 0 end else return 0 end
解鎖方法實現:
@Override public boolean unLock(String lockKey, String requestID) { List<String> keyList = Arrays.asList( lockKey, LOCK_COUNT_KEY_PREFIX + lockKey ); List<String> argsList = Collections.singletonList(requestID); Object result = RedisClient.getInstance().eval(LuaScript.UN_LOCK_SCRIPT, keyList, argsList); // 釋放鎖成功 return RELEASE_LOCK_SUCCESS.equals(result); }
3.4 自動重試機制實現
調用lockAndRetry方法進行加鎖時,如果加鎖失敗,則當前客戶端線程會短暫的休眠一段時間,併進行重試。在重試了一定的次數後,會終止重試加鎖操作,從而加鎖失敗。
需要註意的是,加鎖失敗之後的線程休眠時長是"固定值 + 隨機值",引入隨機值的主要目的是防止高併發時大量的客戶端在幾乎同一時間被喚醒併進行加鎖重試,給redis伺服器帶來周期性的、不必要的瞬時壓力。
@Override public String lockAndRetry(String lockKey) { String uuid = UUID.randomUUID().toString(); return lockAndRetry(lockKey,uuid); } @Override public String lockAndRetry(String lockKey, String requestID) { return lockAndRetry(lockKey,requestID,DEFAULT_EXPIRE_TIME_SECOND); } @Override public String lockAndRetry(String lockKey, int expireTime) { String uuid = UUID.randomUUID().toString(); return lockAndRetry(lockKey,uuid,expireTime); } @Override public String lockAndRetry(String lockKey, int expireTime, int retryCount) { String uuid = UUID.randomUUID().toString(); return lockAndRetry(lockKey,uuid,expireTime,retryCount); } @Override public String lockAndRetry(String lockKey, String requestID, int expireTime) { return lockAndRetry(lockKey,requestID,expireTime,DEFAULT_RETRY_COUNT); } @Override public String lockAndRetry(String lockKey, String requestID, int expireTime, int retryCount) { if(retryCount <= 0){ // retryCount小於等於0 無限迴圈,一直嘗試加鎖 while(true){ String result = lock(lockKey,requestID,expireTime); if(result != null){ return result; } // 休眠一會 sleepSomeTime(); } }else{ // retryCount大於0 嘗試指定次數後,退出 for(int i=0; i<retryCount; i++){ String result = lock(lockKey,requestID,expireTime); if(result != null){ return result; } // 休眠一會 sleepSomeTime(); } return null; } }
4.使用註解切麵簡化redis分散式鎖的使用
通過在方法上引入RedisLock註解切麵,讓對應方法被redis分散式鎖管理起來,可以簡化redis分散式鎖的使用。
切麵註解 RedisLock
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisLock { /** * 無限重試 * */ int UN_LIMIT_RETRY = RedisDistributeLock.UN_LIMIT_RETRY; String lockKey(); int expireTime(); int retryCount(); }
RedisLock 切麵實現
@Component @Aspect public class RedisLockAspect { private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockAspect.class); private static final ThreadLocal<String> REQUEST_ID_MAP = new ThreadLocal<>(); @Pointcut("@annotation(annotation.RedisLock)") public void annotationPointcut() { } @Around("annotationPointcut()") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature)joinPoint.getSignature(); Method method = methodSignature.getMethod(); RedisLock annotation = method.getAnnotation(RedisLock.class); boolean lockSuccess = lock(annotation); if(lockSuccess){ Object result = joinPoint.proceed(); unlock(annotation); return result; } return null; } /** * 加鎖 * */ private boolean lock(RedisLock annotation){ DistributeLock distributeLock = RedisDistributeLock.getInstance(); int retryCount = annotation.retryCount(); String requestID = REQUEST_ID_MAP.get(); if(requestID != null){ // 當前線程 已經存在requestID distributeLock.lockAndRetry(annotation.lockKey(),requestID,annotation.expireTime(),retryCount); LOGGER.info("重入加鎖成功 requestID=" + requestID); return true; }else{ // 當前線程 不存在requestID String newRequestID = distributeLock.lockAndRetry(annotation.lockKey(),annotation.expireTime(),retryCount); if(newRequestID != null){ // 加鎖成功,設置新的requestID REQUEST_ID_MAP.set(newRequestID); LOGGER.info("加鎖成功 newRequestID=" + newRequestID); return true; }else{ LOGGER.info("加鎖失敗,超過重試次數,直接返回 retryCount={}",retryCount); return false; } } } /** * 解鎖 * */ private void unlock(RedisLock annotation){ DistributeLock distributeLock = RedisDistributeLock.getInstance(); String requestID = REQUEST_ID_MAP.get(); if(requestID != null){ // 解鎖成功 boolean unLockSuccess = distributeLock.unLock(annotation.lockKey(),requestID); if(unLockSuccess){ // 移除 ThreadLocal中的數據 REQUEST_ID_MAP.remove(); LOGGER.info("解鎖成功 requestID=" + requestID); } } } }
使用例子
@Service("testService") public class TestServiceImpl implements TestService { @Override @RedisLock(lockKey = "lockKey", expireTime = 100, retryCount = RedisLock.UN_LIMIT_RETRY) public String method1() { return "method1"; } @Override @RedisLock(lockKey = "lockKey", expireTime = 100, retryCount = 3) public String method2() { return "method2"; } }
5.總結
5.1 當前版本缺陷
主從同步可能導致鎖的互斥性失效
在redis主從結構下,出於性能的考慮,redis採用的是主從非同步複製的策略,這會導致短時間內主庫和從庫數據短暫的不一致。
試想,當某一客戶端剛剛加鎖完畢,redis主庫還沒有來得及和從庫同步就掛了,之後從庫中新選拔出的主庫是沒有對應鎖記錄的,這就可能導致多個客戶端加鎖成功,破壞了鎖的互斥性。
休眠並反覆嘗試加鎖效率較低
lockAndRetry方法在客戶端線程加鎖失敗後,會休眠一段時間之後再進行重試。當鎖的持有者持有鎖的時間很長時,其它客戶端會有大量無效的重試操作,造成系統資源的浪費。
進一步優化時,可以使用發佈訂閱的方式。這時加鎖失敗的客戶端會監聽鎖被釋放的信號,在鎖真正被釋放時