前言 微信紅包業務,發紅包之後如果24小時之內沒有被領取完就自動過期失效。 架構設計 業務流程 老闆發紅包,此時緩存初始化紅包個數,紅包金額(單位分),並非同步入庫。 紅包數據入延遲隊列,唯一標識+失效時間 紅包數據出延遲隊列,根據唯一標識清空紅包緩存數據、非同步更新資料庫、非同步退回紅包金額 代碼案例 ...
前言
微信紅包業務,發紅包之後如果24小時之內沒有被領取完就自動過期失效。
架構設計
業務流程
老闆發紅包,此時緩存初始化紅包個數,紅包金額(單位分),並非同步入庫。
紅包數據入延遲隊列,唯一標識+失效時間
紅包數據出延遲隊列,根據唯一標識清空紅包緩存數據、非同步更新資料庫、非同步退回紅包金額
代碼案例
這裡我們使用Java
內置的DelayQueue
來實現,DelayQueue
是一個無界的BlockingQueue
,用於放置實現了Delayed
介面的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。
老闆發了10個紅包一共200人民幣,假裝只有9個人搶紅包。
發紅包,緩存數據進入延遲隊列:
/**
* 有人沒搶 紅包發多了
* 紅包進入延遲隊列
* 實現過期失效
* @param redPacketId
* @return
*/
@ApiOperation(value="搶紅包三",nickname="爪哇筆記")
@PostMapping("/startThree")
public Result startThree(long redPacketId){
int skillNum = 9;
final CountDownLatch latch = new CountDownLatch(skillNum);//N個搶紅包
/**
* 初始化紅包數據,搶紅包攔截
*/
redisUtil.cacheValue(redPacketId+"-num",10);
/**
* 初始化紅包金額,單位為分
*/
redisUtil.cacheValue(redPacketId+"-money",20000);
/**
* 加入延遲隊列 24s秒過期
*/
RedPacketMessage message = new RedPacketMessage(redPacketId,24);
RedPacketQueue.getQueue().produce(message);
/**
* 模擬 9個用戶搶10個紅包
*/
for(int i=1;i<=skillNum;i++){
int userId = i;
Runnable task = () -> {
/**
* 搶紅包 判斷剩餘金額
*/
Integer money = (Integer) redisUtil.getValue(redPacketId+"-money");
if(money>0){
Result result = redPacketService.startTwoSeckil(redPacketId,userId);
if(result.get("code").toString().equals("500")){
LOGGER.info("用戶{}手慢了,紅包派完了",userId);
}else{
Double amount = DoubleUtil.divide(Double.parseDouble(result.get("msg").toString()), (double) 100);
LOGGER.info("用戶{}搶紅包成功,金額:{}", userId,amount);
}
}
latch.countDown();
};
executor.execute(task);
}
try {
latch.await();
Integer restMoney = Integer.parseInt(redisUtil.getValue(redPacketId+"-money").toString());
LOGGER.info("剩餘金額:{}",restMoney);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Result.ok();
}
紅包隊列消息:
/**
* 紅包隊列消息
*/
public class RedPacketMessage implements Delayed {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 預設延遲3秒
*/
private static final long DELAY_MS = 1000L * 3;
/**
* 紅包 ID
*/
private final long redPacketId;
/**
* 創建時間戳
*/
private final long timestamp;
/**
* 過期時間
*/
private final long expire;
/**
* 描述信息
*/
private final String description;
public RedPacketMessage(long redPacketId, long expireSeconds) {
this.redPacketId = redPacketId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + expireSeconds * 1000L;
this.description = String.format("紅包[%s]-創建時間為:%s,超時時間為:%s", redPacketId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public RedPacketMessage(long redPacketId) {
this.redPacketId = redPacketId;
this.timestamp = System.currentTimeMillis();
this.expire = this.timestamp + DELAY_MS;
this.description = String.format("紅包[%s]-創建時間為:%s,超時時間為:%s", redPacketId,
LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
}
public long getRedPacketId() {
return redPacketId;
}
public long getTimestamp() {
return timestamp;
}
public long getExpire() {
return expire;
}
public String getDescription() {
return description;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
紅包延遲隊列:
/**
* 紅包延遲隊列
*/
public class RedPacketQueue {
/** 用於多線程間下單的隊列 */
private static DelayQueue<RedPacketMessage> queue = new DelayQueue<>();
/**
* 私有的預設構造子,保證外界無法直接實例化
*/
private RedPacketQueue(){}
/**
* 類級的內部類,也就是靜態的成員式內部類,該內部類的實例與外部類的實例
* 沒有綁定關係,而且只有被調用到才會裝載,從而實現了延遲載入
*/
private static class SingletonHolder{
/**
* 靜態初始化器,由JVM來保證線程安全
*/
private static RedPacketQueue queue = new RedPacketQueue();
}
//單例隊列
public static RedPacketQueue getQueue(){
return SingletonHolder.queue;
}
/**
* 生產入隊
* 1、執行加鎖操作
* 2、把元素添加到優先順序隊列中
* 3、查看元素是否為隊首
* 4、如果是隊首的話,設置leader為空,喚醒所有等待的隊列
* 5、釋放鎖
*/
public Boolean produce(RedPacketMessage message){
return queue.add(message);
}
/**
* 消費出隊
* 1、執行加鎖操作
* 2、取出優先順序隊列元素q的隊首
* 3、如果元素q的隊首/隊列為空,阻塞請求
* 4、如果元素q的隊首(first)不為空,獲得這個元素的delay時間值
* 5、如果first的延遲delay時間值為0的話,說明該元素已經到了可以使用的時間,調用poll方法彈出該元素,跳出方法
* 6、如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免記憶體泄露
* 7、判斷leader元素是否為空,不為空的話阻塞當前線程
* 8、如果leader元素為空的話,把當前線程賦值給leader元素,然後阻塞delay的時間,即等待隊首到達可以出隊的時間,在finally塊中釋放leader元素的引用
* 9、迴圈執行從1~8的步驟
* 10、如果leader為空並且優先順序隊列不為空的情況下(判斷還有沒有其他後續節點),調用signal通知其他的線程
* 11、執行解鎖操作
*/
public RedPacketMessage consume() throws InterruptedException {
return queue.take();
}
}
紅包延遲隊列過期消費,監聽任務:
/**
* 紅包延遲隊列過期消費
*/
@Component("redPacket")
public class TaskRunner implements ApplicationRunner {
private final static Logger LOGGER = LoggerFactory.getLogger(TaskRunner.class);
@Autowired
private RedisUtil redisUtil;
ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("RedPacketDelayWorker");
thread.setDaemon(true);
return thread;
});
@Override
public void run(ApplicationArguments var){
executorService.execute(() -> {
while (true) {
try {
RedPacketMessage message = RedPacketQueue.getQueue().consume();
if(message!=null){
long redPacketId = message.getRedPacketId();
LOGGER.info("紅包{}過期了",redPacketId);
/**
* 獲取剩餘紅包個數以及金額
*/
int num = (int) redisUtil.getValue(redPacketId+"-num");
int restMoney = (int) redisUtil.getValue(redPacketId+"-money");
LOGGER.info("剩餘紅包個數{},剩餘紅包金額{}",num,restMoney);
/**
* 清空紅包數據
*/
redisUtil.removeValue(redPacketId+"-num");
redisUtil.removeValue(redPacketId+"-money");
/**
* 非同步更新資料庫、非同步退回紅包金額
*/
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
適用場景
淘寶訂單到期,下單成功後60s之後給用戶發送簡訊通知,限時支付、緩存系統等等。
演示
在Application
中有介面演示說明,你可以在搶紅包 Red Packet Controller
介面中輸入任何參數進行測試,也可以配合資料庫稍加修改即可作為生產環境的搶紅包功能模塊。
源碼
https://gitee.com/52itstyle/spring-boot-seckill