用python爬取並分析《2021胡潤百富榜》的榜單數據! 1、python爬蟲講解(requests向介面請求)。 2、python數據分析講解(pandas數據分析及可視化畫圖)含:直方圖、柱形圖、餅圖、詞雲圖等。 ...
實體類
為了方便測試,直接在測試類中的寫內部類:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderInfo {
/**
* 訂單id
*/
private Integer id;
/**
* 描述:用來記錄關閉時間,可以在測試時用來驗證。關閉時間是否跟 expireTime相等
*/
private String description;
/**
* 創建時間
*/
private LocalDateTime createTime;
/**
* 過期時間:關閉時間
*/
private LocalDateTime expireTime;
}
生成訂單
模擬生成訂單並設置過期時間。
執行時會在redis創建2個key:
- redisson_delay_queue:{
<closeKey>
} :訂單數據 - redisson_delay_queue_timeout:{
<closeKey>
} :zset類型,按時間戳排序
/**
* 創建訂單,並設置過期時間
*
* @throws IOException
*/
@Test
void createOrder() {
RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
// 100條訂單
int n = 100;
Random random = new Random();
for (int i = 0; i < n; i++) {
// 1~100之間的正整數
int i1 = random.nextInt(100) + 1;
LocalDateTime now = LocalDateTime.now();
delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS);
}
}
關閉訂單
關閉訂單,這裡會產生訂閱。redis會出現redisson_delay_queue_channel
。
/**
* 關閉訂單
*
* @throws IOException
*/
@Test
void closeOrder() {
ReentrantLock lock = new ReentrantLock();
// 5個線程
int poolSize = 5;
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
futureList.add(CompletableFuture.runAsync(() -> {
RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
// 加入監聽
redissonClient.getDelayedQueue(blockingDeque);
while (true) {
OrderInfo take;
try {
take = blockingDeque.take();
} catch (Exception e) {
continue;
}
if (take == null) {
continue;
}
// 驗證多次是否會重覆關閉。正常里不會近,只是驗證下。正式環境,可以刪除
try {
lock.lock();
if(closed.contains(take.getId())){
log.info("測試是否會搶占:已存在其他線程處理關閉訂單[{}]", take.getId());
}
closed.add(take.getId());
}finally {
lock.unlock();
}
// 處理訂單關閉邏輯
log.info("訂單[{}]關閉中。。。", take.getId());
log.info("訂單[{}]已關閉!order={}", take.getId(), toJsonString(take));
}
}));
}
// 模擬正式環境中進程一直在運行,因為test時,沒有join則會只執行一次出現消費完數據後進程就關閉了
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
}
完整測試類:
package cn.skyjilygao.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static cn.skyjilygao.util.EntityUtil.toJsonString;
@Slf4j
@SpringBootTest
public class CloseOrderTests {
@Autowired
private RedissonClient redissonClient;
public static String closeKey = "order_close_test";
public volatile static Set<Integer> closed = new ConcurrentSkipListSet<>();
/**
* 創建訂單,並設置過期時間
*
* @throws IOException
*/
@Test
void createOrder() {
RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
int a = 100;
Random random = new Random(100);
for (int i = 0; i < a; i++) {
int i1 = random.nextInt(1 + i) + 1;
delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, LocalDateTime.now(), LocalDateTime.now().plusSeconds(i1)), i1, TimeUnit.SECONDS);
}
}
/**
* 關閉訂單
*
* @throws IOException
*/
@Test
void closeOrder() {
ReentrantLock lock = new ReentrantLock();
// 5個線程
int poolSize = 5;
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
futureList.add(CompletableFuture.runAsync(() -> {
RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
// 加入監聽
redissonClient.getDelayedQueue(blockingDeque);
while (true) {
OrderInfo take;
try {
take = blockingDeque.take();
} catch (Exception e) {
continue;
}
if (take == null) {
continue;
}
try {
lock.lock();
if(closed.contains(take.getId())){
log.info("測試是否會搶占:已存在其他線程處理關閉訂單[{}]", take.getId());
}
closed.add(take.getId());
}finally {
lock.unlock();
}
log.info("訂單[{}]關閉中。。。", take.getId());
log.info("訂單[{}]已關閉!order={}", take.getId(), toJsonString(take));
}
}));
}
// 模擬正式環境中進程一直在運行,因為test時,沒有join則會只執行一次出現消費完數據後進程就關閉了
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderInfo {
private Integer id;
private String description;
private LocalDateTime createTime;
private LocalDateTime expireTime;
}
}