對於一些併發量不是很高的場景,使用MySQL來實現會比較精簡且巧妙。 下麵就一個小例子,針對不加鎖、樂觀鎖以及悲觀鎖這三種方式來實現。 主要是一個用戶表,它有一個年齡的欄位,然後併發地對其加一,看看結果是否正確。 ...
目錄
背景
對於一些併發量不是很高的場景,使用MySQL來實現分散式鎖會比較精簡且巧妙。
下麵就一個小例子,針對不加鎖、樂觀鎖以及悲觀鎖這三種方式來實現。
主要是一個用戶表,它有一個年齡的欄位,然後併發地對其加一,看看結果是否正確。
一些基礎實現類
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Integer age;
private String name;
private Long id;
private Long version;
}
public interface UserMapper {
@Results(value = {
@Result(property = "id", column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "age", column = "age", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "version", column = "version", javaType = Long.class, jdbcType = JdbcType.BIGINT),
})
@Select("SELECT id, age, name, version FROM user WHERE id = #{id}")
User getUser(Long id);
@Update("UPDATE user SET age = #{age}, version=version+1 WHERE id = #{id} AND version = #{version}")
Boolean compareAndSetAgeById(Long id, Long version, Integer age);
@Update("UPDATE user SET age = #{age} WHERE id = #{id}")
Boolean setAgeById(Long id, Integer age);
@Select("SELECT id, age, name, version FROM user WHERE id = #{id} for update")
User getUserForUpdate(Long id);
}
private static void exe(CountDownLatch countDownLatch, int threads, Runnable runnable) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threads; i++) {
executorService.execute(() -> {
runnable.run();
countDownLatch.countDown();
});
}
executorService.shutdown();
}
private static User getUser(long id) {
try (SqlSession session = openSession(getDataSource1())) {
UserMapper userMapper = session.getMapper(UserMapper.class);
return userMapper.getUser(id);
}
}
public static SqlSession openSession(DataSource dataSource) {
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, dataSource);
Configuration configuration = new Configuration(environment);
configuration.addMapper(UserMapper.class);
configuration.setCacheEnabled(false);
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
return sqlSessionFactory.openSession();
}
private static DataSource getDataSource1() {
MysqlConnectionPoolDataSource dataSource = new MysqlConnectionPoolDataSource();
dataSource.setUser("root");
dataSource.setPassword("root");
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8");
return dataSource;
}
不加鎖
private static Boolean addAge(long id, int value) {
Boolean result;
try (SqlSession session = openSession(getDataSource1())) {
UserMapper userMapper = session.getMapper(UserMapper.class);
User user = userMapper.getUser(id);
result = userMapper.setAgeById(user.getId(), user.getAge() + value);
session.commit();
}
return result;
}
public static void main(String[] args) throws Exception {
long id = 1L;
int threads = 50;
CountDownLatch countDownLatch = new CountDownLatch(threads);
log.info("user:{}", getUser(id));
long start = System.currentTimeMillis();
exe(countDownLatch, threads, () -> addAge(1, 1));
countDownLatch.await();
long end = System.currentTimeMillis();
log.info("end - start : {}", end - start);
log.info("user:{}", getUser(id));
}
865 [main] INFO cn.eagle.li.mybatis.UpdateMain - user:User(age=0, name=3, id=1, version=0)
1033 [main] INFO cn.eagle.li.mybatis.UpdateMain - end - start : 164
1046 [main] INFO cn.eagle.li.mybatis.UpdateMain - user:User(age=9, name=3, id=1, version=0)
從輸出可以看出,50個併發,但是執行成功的只有9個,這種實現很明顯是有問題的。
樂觀鎖
private static Boolean compareAndAddAge(long id, int value, int times) {
int time = 0;
Boolean result = false;
while (time++ < times && BooleanUtils.isFalse(result)) {
result = compareAndAddAge(id, value);
}
return result;
}
private static Boolean compareAndAddAge(long id, int value) {
try (SqlSession session = openSession(getDataSource1())) {
UserMapper userMapper = session.getMapper(UserMapper.class);
User user = userMapper.getUser(id);
Boolean result = userMapper.compareAndSetAgeById(id, user.getVersion(), user.getAge() + value);
session.commit();
return result;
}
}
public static void main(String[] args) throws Exception {
long id = 1L;
int threads = 50;
CountDownLatch countDownLatch = new CountDownLatch(threads);
log.info("user:{}", getUser(id));
long start = System.currentTimeMillis();
exe(countDownLatch, threads, () -> addAge(1, 1, 20));
countDownLatch.await();
long end = System.currentTimeMillis();
log.info("end - start : {}", end - start);
log.info("user:{}", getUser(id));
}
758 [main] INFO cn.eagle.li.mybatis.UpdateMain - user:User(age=0, name=3, id=1, version=0)
1270 [main] INFO cn.eagle.li.mybatis.UpdateMain - end - start : 509
1277 [main] INFO cn.eagle.li.mybatis.UpdateMain - user:User(age=50, name=3, id=1, version=50)
從輸出可以看出,併發的情況下,結果是沒問題的。
悲觀鎖
private static Boolean addAgeForUpdate(long id, int value) {
Boolean result;
try (SqlSession session = openSession(getDataSource1())) {
UserMapper userMapper = session.getMapper(UserMapper.class);
User user = userMapper.getUserForUpdate(id);
result = userMapper.setAgeById(id, user.getAge() + value);
session.commit();
}
return result;
}
public static void main(String[] args) throws Exception {
long id = 1L;
int threads = 50;
CountDownLatch countDownLatch = new CountDownLatch(threads);
log.info("user:{}", getUser(id));
long start = System.currentTimeMillis();
exe(countDownLatch, threads, () -> addAgeForUpdate(1, 1));
countDownLatch.await();
long end = System.currentTimeMillis();
log.info("end - start : {}", end - start);
log.info("user:{}", getUser(id));
}
631 [main] INFO cn.eagle.li.mybatis.UpdateMain - user:User(age=0, name=3, id=1, version=50)
829 [main] INFO cn.eagle.li.mybatis.UpdateMain - end - start : 196
837 [main] INFO cn.eagle.li.mybatis.UpdateMain - user:User(age=50, name=3, id=1, version=50)
從輸出可以看出,併發的情況下,結果是沒問題的。
總結
從以上來看,樂觀鎖和悲觀鎖實現都是沒有問題的,至於選哪一種,還是要看業務的場景,比如說併發量的多少,加鎖時長等等。