功能03-優惠券秒殺04 4.功能03-優惠券秒殺 4.7Redis優化秒殺 4.7.1優化分析 現在來回顧一下優惠券秒殺業務的兩個主要問題: (1)首先是對優惠券的扣減,需要防止庫存超賣現象; (2)其次,需要對每個用戶下單數量進行限制,實現一人一單的功能。 處理秒殺優惠券的業務: 先根據獲取到的 ...
功能03-優惠券秒殺04
4.功能03-優惠券秒殺
4.7Redis優化秒殺
4.7.1優化分析
現在來回顧一下優惠券秒殺業務的兩個主要問題:
(1)首先是對優惠券的扣減,需要防止庫存超賣現象;
(2)其次,需要對每個用戶下單數量進行限制,實現一人一單的功能。
處理秒殺優惠券的業務:
-
先根據獲取到的優惠券id,先到資料庫中判斷是否存在,若存在;
-
再判斷優惠券是否在設定的有效期,如果是,則進行一人一單的業務處理:
-
2.1 利用分散式鎖,key存儲的是order+用戶id:當同一時間,一個用戶發起了多個線程請求,其中的某個線程獲取到了鎖,由於互斥性,無論這個用戶發起了多少個請求,只有一個線程能進入接下來的業務。(不同用戶發起的不同線程之間不影響)
-
2.2 接下來,查詢該用戶是否已經買過這張秒殺券了,如果買過了,則不允許重覆購買,如果是第一次購買,就進入到防止超賣的業務:
- 2.2.1 到這一步可能會有多個用戶的單個線程進入這個業務,為了防止超賣問題,這裡使用樂觀鎖方案。樂觀鎖的關鍵是判斷之前查詢到的數據是否有被修改過,但缺點是失敗率高,因此我們又使用了mysql的行鎖解決。(詳見day05-優惠券秒殺01)
-
因為整個過程有很多對資料庫的操作(查詢優惠券、查詢訂單、減庫存、創建訂單),因此這個業務的性能並不是很好:
優化前:
上述業務看似複雜,實際上只有兩個過程:(1)對於用戶資格的校驗:庫存夠不夠,該用戶買沒買過(一人一單)(2)然後才是真正的下單業務。
我們可以對這兩個過程進行分離,別分使用兩個線程進行操作:主線程負責對用戶購買資格的校驗,如果有購買的資格,再開啟一個獨立的線程,來處理耗時較久的減庫存和創建訂單操作。
為了提高效率,使用redis判斷秒殺庫存和校驗一人一單,如果校驗通過,則redis會記錄優惠券信息、用戶信息、訂單信息到阻塞隊列。一方面:tomcat伺服器去讀取這個隊列的信息,完成下單。另一方面:redis給用戶返回一個訂單號,代表該用戶搶單成功,用戶可以根據這個訂單號去付款。
優化後:
這樣,整個秒殺流程就變為:直接在redis中判斷用戶的秒殺資格和庫存,然後將信息保存到隊列里。
秒殺業務的流程變短了,而且是基於Redis,性能得到很大的提升,整個業務的吞吐能力、併發能力可以大大提高了。
那麼,如何在Redis中完成對秒殺庫存的判斷和一人一單的判斷呢?
首先是對數據的存儲:
- 使用String類型,key存儲 業務首碼+秒殺券id,value保存優惠券對應的庫存;
- 因為要保證一人一單,使用set類型,key保存業務首碼+秒殺券id,value保存下單的用戶id,保證元素不可重覆。
優化後,在Redis中需要執行的具體流程:
非同步秒殺優化總結:
上述的優化操作,一方面縮短了秒殺業務的流程,從而大大提高了秒殺業務的併發;另一方面,redis的操作和資料庫的操作是非同步的,對資料庫操作的時效性不再要求那麼高了,減輕了資料庫的壓力。
4.7.2代碼實現
改進秒殺業務,提高併發性能。需求:
- 新增秒殺優惠券的同時,將優惠券信息保存到Redis中
- 基於Lua腳本,判斷秒殺庫存、一人一單,決定用戶是否搶占成功
- 如果搶占成功,將優惠券id和用戶id封裝後存入阻塞隊列
- 開啟線程任務,不斷地從阻塞隊列中獲取信息,實現非同步下單功能
需求1:新增秒殺優惠券的同時,將優惠券信息保存到Redis中
(1.1)修改IVoucherService
package com.hmdp.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.hmdp.dto.Result;
import com.hmdp.entity.Voucher;
/**
* 服務類
*
* @author 李
* @version 1.0
*/
public interface IVoucherService extends IService<Voucher> {
void addSeckillVoucher(Voucher voucher);
}
(1.2)修改VoucherServiceImpl
package com.hmdp.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.entity.Voucher;
import com.hmdp.mapper.VoucherMapper;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherService;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import static com.hmdp.utils.RedisConstants.SECKILL_STOCK_KEY;
/**
* 服務實現類
*
* @author 李
* @version 1.0
*/
@Service
public class VoucherServiceImpl extends ServiceImpl<VoucherMapper, Voucher> implements IVoucherService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存優惠券到資料庫
save(voucher);
// 保存秒殺優惠券信息到資料庫
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//保存秒殺庫存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
}
(1.3)修改VoucherController
package com.hmdp.controller;
import com.hmdp.dto.Result;
import com.hmdp.entity.Voucher;
import com.hmdp.service.IVoucherService;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* 前端控制器
*
* @author 李
* @version 1.0
*/
@RestController
@RequestMapping("/voucher")
public class VoucherController {
@Resource
private IVoucherService voucherService;
/**
* 新增秒殺券
* @param voucher 優惠券信息,包含秒殺信息
* @return 優惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
}
(1.4)使用postman進行測試,返回結過顯示插入成功,data為插入的秒殺券的id
資料庫和Redis中也分別插入成功了:
需求2:基於Lua腳本,判斷秒殺庫存、一人一單,決定用戶是否搶占成功
在resources目錄下新建一個Lua腳本
seckill.lua:
-- 1.參數列表
-- 1.1 優惠券id
local voucherId = ARGV[1]
-- 1.2 用戶id
local userId = ARGV[2]
-- 2.數據key
-- 2.1 庫存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 訂單key
local orderKey = 'seckill:order' .. voucherId
-- 3.腳本業務
-- 3.1判斷庫存是否充足 get stockKey
if (tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2庫存不足,返回1
return 1
end
-- 3.3庫存充足,判斷用戶是否下過單(判斷用戶id是否在訂單key對應的集合中)
-- sismember orderKey userId
if (redis.call('sismember', orderKey, userId) == 1) then
-- 3.4 若存在,說明是重覆下單,返回2
return 2
end
-- 3.5 扣庫存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.6 下單(保存用戶) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
需求3:如果搶占成功,將優惠券id和用戶id封裝後存入阻塞隊列
需求4:開啟線程任務,不斷地從阻塞隊列中獲取信息,實現非同步下單功能
修改VoucherOrderServiceImpl:
- 請求先來到seckillVoucher()方法,該方法先調用lua腳本,嘗試判斷用戶有沒有購買資格、庫存是否充足。如果有,創建訂單,放到阻塞對列中。此時整個秒殺業務就結束了,用戶可以得到結果。
- 創建阻塞隊列和線程池,在類初始化的時候就執行線程池。線程池的業務就是不斷地從阻塞隊列中獲取訂單信息,然後創建訂單(調用handleVoucherOrder()方法)
package com.hmdp.service.impl;
import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.*;
/**
* 服務實現類
*
* @author 李
* @version 1.0
*/
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
//類一載入就初始化腳本
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//阻塞隊列:當一個線程嘗試從隊列中獲取元素時,如果隊列中沒有元素,那麼該線程就會被阻塞,直到隊列中有元素,線程才會被喚醒並獲取元素
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
//線程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//在當前類初始化完畢之後就執行
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//執行非同步操作,從阻塞隊列中獲取訂單
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1.獲取隊列中的訂單信息
/* take()--獲取和刪除阻塞對列中的頭部,如果需要則等待直到元素可用
(因此不必擔心這裡的死迴圈會增加cpu的負擔) */
VoucherOrder voucherOrder = orderTasks.take();
//2.創建訂單
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("處理訂單異常", e);
}
}
}
}
private IVoucherOrderService proxy;
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//獲取用戶(因為目前的是線程池對象,不是主線程,不能使用UserHolder從ThreadLocal中獲取用戶id)
Long userId = voucherOrder.getUserId();
//創建鎖對象,指定鎖的名稱
RLock lock = redissonClient.getLock("lock:order:" + userId);
//獲取鎖(可重入鎖)
boolean isLock = lock.tryLock();
//判斷是否獲取鎖成功
if (!isLock) {
//獲取鎖失敗
log.error("不允許重覆下單");
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
//釋放鎖
lock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
//獲取用戶id
Long userId = UserHolder.getUser().getId();
//1.執行lua腳本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
//2.判斷腳本執行結果是否為0
int r = result.intValue();
if (r != 0) {
//2.1如果不為0,代表沒有購買資格
return Result.fail(r == 1 ? "庫存不足" : "不能重覆下單");
}
//2.2如果為0,代表有購買資格,將下單信息保存到阻塞對列中
VoucherOrder voucherOrder = new VoucherOrder();
//設置訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//設置用戶id
voucherOrder.setUserId(userId);
//設置秒殺券id
voucherOrder.setVoucherId(voucherId);
//將上述信息保存到阻塞隊列
orderTasks.add(voucherOrder);
//3.獲取代理對象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//4.返回訂單id
return Result.ok(0);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//一人一單
Long userId = voucherOrder.getUserId();
//查詢訂單
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {//說明已經該用戶已經對該優惠券下過單了
log.error("用戶已經購買過一次!");
return;
}
//庫存充足,則扣減庫存(操作秒殺券表)
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")//set stock = stock -1
//where voucher_id =? and stock>0
.gt("stock", 0).eq("voucher_id", voucherOrder.getVoucherId()).update();
if (!success) {//操作失敗
log.error("秒殺券庫存不足!");
return;
}
//將訂單寫入資料庫(操作優惠券訂單表)
save(voucherOrder);
}
}
重啟項目,進行測試:
(1)初始數據:
(2)使用jemeter進行測試:使用1000個不同的用戶同時向伺服器發送搶購秒殺券的請求
測試結果:可以看到平均響應實現為216毫秒,最小值為17毫秒,比之前平均500毫秒的響應時間縮短了一半。
4.7.3秒殺優化總結
(1)秒殺業務的優化思路是什麼?
- 先利用Redis完成庫存餘量判斷、一人一單判斷,完成搶單業務
- 再將下單業務放入阻塞隊列,利用獨立線程非同步下單
(2)基於阻塞隊列的非同步秒殺存在哪些問題?
-
記憶體限制問題:
這裡我們使用的是JDK裡面的阻塞隊列,它使用的是JVM裡面的記憶體。如果不加以限制,在高併發的情況下,可能會有非常多的訂單對象需要去創建,放入阻塞隊列中,可能會導致記憶體溢出。雖然我們限制了隊列的長度,但是如果隊列存滿了,再有新的訂單來,就放不下了。
-
數據安全問題:
現在的代碼基於記憶體來保存訂單信息,如果伺服器宕機了,那麼阻塞隊列中的所有訂單信息將會丟失
我們將在接下來的分析中對上述兩個問題進行解決。
4.8Redis消息隊列實現非同步秒殺
要解決上面的兩個問題,最佳的解決方案就是使用消息隊列。
4.8.1什麼是消息隊列
消息隊列(Message Queue,簡稱MQ),字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:
- 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
- 生產者:發送消息到消息隊列
- 消費者:從消息隊列獲取消息並處理消息
4.8.2消息隊列實現非同步秒殺的優勢
使用消息隊列實現非同步秒殺的優勢:
- 消息隊列是JVM以外的獨立服務,不受JVM記憶體的限制,這就解決了之前的記憶體限制問題
- 消息隊列不僅僅是做數據存儲,它還要確保數據安全,即消息隊列里的所有消息都要做持久化,這樣不管是服務宕機還是重啟,數據都不會丟失
- 消息隊列將消息投遞給消費者之後,要求消費者做消息的確認。如果消息沒有被確認,這個消息就會在隊列中依然存在,下一次會再次投遞給消費者,直到收到消息確認為止。
當下比較知名的消息引擎,包括:ActiveMQ、RabbitMQ、Kafka、RocketMQ、Artemis 等
這裡使用Redis實現消息隊列:
Redis提供了三種不同的方式來實現消息隊列:
- list結構:基於List結構模擬消息隊列
- PubSub:基本的點對點消息模型
- Stream:比較完善的消息隊列模型
4.8.3基於List結構模擬的消息隊列
Redis的List數據結構是一個雙向鏈表,很容易模擬出隊列效果。隊列是入口和出口不在一邊,我們可以利用:LPUSH結合RPOP、或者RPUSH結合LPOP來實現。
不過要註意的是,當隊列中沒有消息時,RPOP或LPOP操作會返回null,並不像JVM的阻塞隊列那樣會阻塞並等待消息。因此這裡應該使用BRPOP或者BLPOP來實現阻塞效果。
BRPOP key [key ...] timeout
summary: Remove and get the last element in a list, or block until one is available
since: 2.0.0
RPOP key
summary: Remove and get the last element in a list
since: 1.0.0
基於List的消息隊列有哪些優缺點?
優點:
- 利用Redis存儲,不受限於JVM記憶體上限
- 基於Redis的持久化機制,數據安全性有保證
- 可以滿足消息有序性
缺點:
- 無法避免消息丟失
- 只支持單消費者
4.8.4基於PubSub的消息隊列
PubSub(發佈訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或者多個channel,生產者向對應channel發送消息後,所有訂閱者都能收到相關消息。
- SUBSCRIBE channel [channel]:訂閱一個或者多個頻道
- PUBLISH channel msg:向一個頻道發送消息
- PSUBSCRIBE pattern [pattern]:訂閱與pattern格式相匹配的所有頻道
基於PubSub的消息隊列有哪些優缺點?
優點:採用發佈訂閱模型,支持多生產、多消費
缺點:
- 不支持數據持久化
- 無法避免消息丟失
- 消息堆積有上限,超出時數據丟失
4.8.5基於Stream的消息隊列
Stream是Redis5.0引入的一種新的數據類型,可以實現一個功能非常完善的消息隊列。
發送消息的命令: