前言 秒殺架構到後期,我們採用了消息隊列的形式實現搶購邏輯,那麼之前拋出過這樣一個問題:消息隊列非同步處理完每個用戶請求後,如何通知給相應用戶秒殺成功? 場景映射 首先,我們舉一個生活中比較常見的例子:我們去銀行辦理業務,一般會選擇相關業務列印一個排號紙,然後就可以坐在小板凳上玩著手機,等待被小喇叭報 ...
前言
秒殺架構到後期,我們採用了消息隊列的形式實現搶購邏輯,那麼之前拋出過這樣一個問題:消息隊列非同步處理完每個用戶請求後,如何通知給相應用戶秒殺成功?
場景映射
首先,我們舉一個生活中比較常見的例子:我們去銀行辦理業務,一般會選擇相關業務列印一個排號紙,然後就可以坐在小板凳上玩著手機,等待被小喇叭報號。當小喇叭喊到你所持有的號碼,就可以拿著排號紙去櫃臺辦理自己的業務。
這裡,假設當我們取排號紙的時候,銀行根據時間段內的排隊情況,比較人性化的提示用戶:排隊人數較多,您是否繼續等待?否的話我們可以換個時間段再來辦理。
由此我們把生活場景映射到真實的秒殺業務邏輯中來:
- 我們可以把櫃臺比喻成商品下單處理邏輯單元
- 拿到排號紙說明你進入相應商品處理隊列
- 拿到排號紙的請求直接返回前臺,提示用戶搶購進行中
- 排號紙進入隊列後,等待商品業務處理邏輯
- 小喇叭叫到自己的排號相當於服務端通知用戶秒殺成功,這時候可以進行支付邏輯
- 那些拿不到票號的同學,相當於隊列已滿直接返回秒殺失敗
解決方案
通過上面的場景,我們很容易能夠想到一種方案就是服務端通知,那麼如何做到服務端非同步通知的呢?下麵,主角開始登場了,就是我們的Websocket。
WebSocket是HTML5開始提供的一種瀏覽器與伺服器間進行全雙工通訊的網路技術。依靠這種技術可以實現客戶端和伺服器端的長連接,雙向實時通信。
特點:
- 非同步、事件觸發
- 可以發送文本,圖片等流文件
- 數據格式比較輕量,性能開銷小,通信高效
- 使用ws或者wss協議的客戶端socket,能夠實現真正意義上的推送功能
缺點:
- 部分瀏覽器不支持,瀏覽器支持的程度與方式有區別,需要各種相容寫法。
集成案例
由於我們的秒殺架構項目案例中使用了SpringBoot,因此集成webSocket也是相對比較簡單的。
首先pom.xml引入以下依賴:
<!-- webSocket 秒殺通知-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocketConfig 配置:
/**
* WebSocket配置
* 創建者 爪哇筆記
* 創建時間 2018年5月29日
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocketServer 配置:
@ServerEndpoint("/websocket/{userId}")
@Component
public class WebSocketServer {
private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
//靜態變數,用來記錄當前線上連接數。應該把它設計成線程安全的。
private static int onlineCount = 0;
//concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//與某個客戶端的連接會話,需要通過它來給客戶端發送數據
private Session session;
//接收userId
private String userId="";
/**
* 連接建立成功調用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //線上數加1
log.info("有新視窗開始監聽:"+userId+",當前線上人數為" + getOnlineCount());
this.userId=userId;
try {
sendMessage("連接成功");
} catch (IOException e) {
log.error("websocket IO異常");
}
}
/**
* 連接關閉調用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //從set中刪除
subOnlineCount(); //線上數減1
log.info("有一連接關閉!當前線上人數為" + getOnlineCount());
}
/**
* 收到客戶端消息後調用的方法
* @param message 客戶端發送過來的消息*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到來自視窗"+userId+"的信息:"+message);
//群發消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("發生錯誤");
error.printStackTrace();
}
/**
* 實現伺服器主動推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群發自定義消息
* */
public static void sendInfo(String message,@PathParam("userId") String userId){
log.info("推送消息到視窗"+userId+",推送內容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
//這裡可以設定只推送給這個userId的,為null則全部推送
if(userId==null) {
item.sendMessage(message);
}else if(item.userId.equals(userId)){
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
KafkaConsumer 消費配置,通知用戶是否秒殺成功:
/**
* 消費者 spring-kafka 2.0 + 依賴JDK8
* @author 科幫網 By https://blog.52itstyle.com
*/
@Component
public class KafkaConsumer {
@Autowired
private ISeckillService seckillService;
private static RedisUtil redisUtil = new RedisUtil();
/**
* 監聽seckill主題,有消息就讀取
* @param message
*/
@KafkaListener(topics = {"seckill"})
public void receiveMessage(String message){
//收到通道的消息之後執行秒殺操作
String[] array = message.split(";");
if(redisUtil.getValue(array[0])!=null){//control層已經判斷了,其實這裡不需要再判斷了
Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));
if(result.equals(Result.ok())){
WebSocketServer.sendInfo(array[0].toString(), "秒殺成功");//推送給前臺
}else{
WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺
redisUtil.cacheValue(array[0], "ok");//秒殺結束
}
}else{
WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺
}
}
}
webSocket.js 前臺通知邏輯:
$(function(){
socket.init();
});
var basePath = "ws://localhost:8080/seckill/";
socket = {
webSocket : "",
init : function() {
//userId:自行追加
if ('WebSocket' in window) {
webSocket = new WebSocket(basePath+'websocket/1');
}
else if ('MozWebSocket' in window) {
webSocket = new MozWebSocket(basePath+"websocket/1");
}
else {
webSocket = new SockJS(basePath+"sockjs/websocket");
}
webSocket.onerror = function(event) {
alert("websockt連接發生錯誤,請刷新頁面重試!")
};
webSocket.onopen = function(event) {
};
webSocket.onmessage = function(event) {
var message = event.data;
alert(message)//判斷秒殺是否成功、自行處理邏輯
};
}
}
客戶端API
客戶端與伺服器通信
- send() 向遠程伺服器發送數據
- close() 關閉該websocket鏈接
監聽函數
- onopen 當網路連接建立時觸發該事件
- onerror 當網路發生錯誤時觸發該事件
- onclose 當websocket被關閉時觸發該事件
- onmessage 當websocket接收到伺服器發來的消息的時觸發的事件,也是通信中最重要的一個監聽事件。
readyState屬性
這個屬性可以返回websocket所處的狀態。
- CONNECTING(0) websocket正嘗試與伺服器建立連接
- OPEN(1) websocket與伺服器已經建立連接
- CLOSING(2) websocket正在關閉與伺服器的連接
- CLOSED(3) websocket已經關閉了與伺服器的連接
開源方案
goeasy
GoEasy實時Web推送,支持後臺推送和前臺推送兩種:後臺推送可以選擇Java SDK、 Restful API支持所有開發語言;前臺推送:JS推送。無論選擇哪種方式推送代碼都十分簡單(10分鐘可搞定)。由於它支持websocket 和polling兩種連接方式所以兼顧大多數主流瀏覽器,低版本的IE瀏覽器也是支持的。
地址:http://goeasy.io/
Pushlets
Pushlets 是通過長連接方式實現“推”消息的。推送模式分為:Poll(輪詢)、Pull(拉)。
地址:http://www.pushlets.com/
Pushlet
Pushlet 是一個開源的 Comet 框架,Pushlet 使用了觀察者模型:客戶端發送請求,訂閱感興趣的事件;伺服器端為每個客戶端分配一個會話 ID 作為標記,事件源會把新產生的事件以多播的方式發送到訂閱者的事件隊列里。
地址:https://github.com/wjw465150/Pushlet
總結
其實前面有提過,儘管WebSocket有諸多優點,但是,如果服務端維護很多長連接也是挺耗費資源的,伺服器集群以及覽器或者客戶端相容性問題,也會帶來了一些不確定性因素。大體瞭解了一下各大廠的做法,大多數都還是基於輪詢的方式實現的,比如:騰訊PC端微信掃碼登錄、京東商城支付成功通知等等。
有些小伙伴可能會問了,輪詢豈不是會更耗費資源?其實在我看來,有些輪詢是不可能穿透到後端資料庫查詢服務的,比如秒殺,一個緩存標記位就可以判定是否秒殺成功。相對於WS的長連接以及其不確定因素,在秒殺場景下,輪詢還是相對比較合適的。
思考
最後,思考一個問題:100件商品,假如有一萬人進行搶購,該如何設置隊列長度?
秒殺案例:https://gitee.com/52itstyle/spring-boot-seckill
參考
https://blog.52itstyle.com/archives/736/
https://www.xoriant.com/blog/mobility/websocket-web-stateful-now.html