書接上回 大數據量、高併發業務怎麼優化?(一) 文章中介紹了非同步批處理的三種方式,本文繼續深入針對前兩種進行講解,並給出代碼示例: 一 普通版本,採用阻塞隊列 ArrayBlockingQueue 使用普通方式能夠直接基於JDK中現成的併發包 ArrayBlockingQueue 提供的 offer ...
書接上回 大數據量、高併發業務怎麼優化?(一) 文章中介紹了非同步批處理的三種方式,本文繼續深入針對前兩種進行講解,並給出代碼示例:
一 普通版本,採用阻塞隊列 ArrayBlockingQueue
使用普通方式能夠直接基於JDK中現成的併發包 ArrayBlockingQueue
提供的 offer(E e, long timeout, TimeUnit unit)
(添加元素到隊列尾部,如果隊列已滿則等待參數指定時間後返回false)方法 和 poll(long timeout, TimeUnit unit)
(從隊列頭部獲取元素,如果隊列為空則等待參數指定時間後返回null)方法,來達到非同步批處理效果
生產者代碼:由於採用記憶體隊列,最好在創建 ArrayBlockingQueue
時指定隊列大小,防止隊列無界,導致記憶體溢出
/**
* 生產者
*/
@Component
@Slf4j
public class MonitorQueue {
private BlockingQueue<List<NodeCollectDTO>> queue = new ArrayBlockingQueue<>(10000000);
public void put(List<NodeCollectDTO> list) {
try {
queue.put(list);
} catch (InterruptedException e) {
log.error(String.format("隊列put異常:%s", e.getMessage()), e);
}
}
public void offer(List<NodeCollectDTO> list, long timeout, TimeUnit unit) throws InterruptedException {
queue.offer(list, timeout, unit);
}
public List<NodeCollectDTO> poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
}
消費者代碼:在創建生產者時開啟一個子線程在死迴圈中一直讀取隊列元素,直到隊列元素超過我們的 maxNum
時,將臨時列表元素插入資料庫中
/**
* 消費者
*/
@Slf4j
@Component
public class MonitorConsumer implements Runnable {
@Autowired
private MonitorQueue queue;
@Autowired
private MonitorService monitorService;
@PostConstruct
public void init() {
new Thread(this, "monitor-collect").start();
}
// 臨時列表大小限制
private int maxNum = 2000;
@SuppressWarnings("InfiniteLoopStatement")
@Override
public void run() {
while (true) {
handler();
}
}
private void handler() {
try {
List<NodeCollectDTO> temp = new ArrayList<>(maxNum);
while (temp.size() <= maxNum) {
List<NodeCollectDTO> list = queue.poll(20, TimeUnit.SECONDS);
if (CollectionUtil.isNotEmpty(list)) {
temp.addAll(list);
} else {
break;
}
}
if (CollectionUtil.isEmpty(temp)) {
return;
}
int i = monitorService.batchSave(temp);
log.debug("----------------------------batchSave num:{}, collect.size:{}", i, collect.size());
} catch (Exception e) {
log.error(String.format("消費者異常: %s", e.getMessage()), e);
}
}
}
可以看到採用該種方式實現的非同步批量入庫代碼比較簡單,便於理解,在性能上,基本都能夠滿足日常普通業務存在的批量入庫場景
二 進階版,採用 Disruptor
隊列,本文基於 Disruptor
最新4.0版本
先給出 Disruptor
官網簡介
Disruptor 是一個提供併發環形緩衝區數據結構的庫。它旨在在非同步事件處理架構中提供低延遲、高吞吐量的工作隊列。
為了理解 Disruptor 的好處,我們可以將它與一些很好理解且目的非常相似的東西進行比較。在 Disruptor 的情況下,這將是 Java 的 BlockingQueue。與隊列一樣,Disruptor 的目的是在同一進程內的線程之間移動數據(例如消息或事件)。然而,Disruptor 提供的一些關鍵特性使其有別於隊列。他們是:
向消費者多播事件,帶有消費者依賴圖。
為事件預分配記憶體。
可選無鎖
Disruptor
給我們在項目中實現非同步批處理提供了另一種方式,一種無鎖、延遲更低、吞吐量更高、提供消費者多播等等的記憶體隊列
下麵介紹如何使用
2.1 依賴安裝
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0.RC1</version>
</dependency>
2.2 Disruptor
使用代碼如下:
public class LongEvent{
private long value;
public void set(long value){
this.value = value;
}
@Override
public String toString(){
return "LongEvent{" + "value=" + value + '}';
}
}
@Slf4j
public class LongEventMain {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {
log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception {
int bufferSize = 128;
// 1. 創建Disruptor對象
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
// 2. 添加事件處理類(消費者)
disruptor.handleEventsWith(LongEventMain::handleEvent);
// 3. 開啟事件處理線程
disruptor.start();
// 4. 獲取ringBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
// 5. 發佈事件(生產者)
ringBuffer.publishEvent(LongEventMain::translate, bb);
Thread.sleep(1);
}
}
}
2.3 上面代碼完成了一個事件發佈後,事件處理類就能夠收到對應事件信息的功能,但是我們想要的是能在消費者線程中批量處理生產者數據的邏輯,還得再修改一下事件處理類代碼,如下:
@Slf4j
public class LongEventBatch implements EventHandler<LongEvent> {
private static final int MAX_BATCH_SIZE = 20;
private final List<LongEvent> batch = new ArrayList<>();
public LongEventBatch() {
// 虛擬機關閉處理
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("------------------ShutdownHook-DataEventHandler,上報tempList");
if (batch.size() > 0) {
// 批量入庫偽代碼
int i = xxxService.batchSave(temp);
}
}));
}
@Override
public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) {
log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);
batch.add(event);
if (batch.size() >= MAX_BATCH_SIZE) {
processBatch(batch);
}
}
private void processBatch(final List<LongEvent> batch) {
// 批量入庫偽代碼
int i = xxxService.batchSave(temp);
// 記得清空batch列表
batch.clear();
}
}
由此,我們就實現了基於 Disruptor
的非同步批處理邏輯,該方式會比普通版本性能高出一個數量級,大家在工作中可以嘗試使用一番
最後
附博主 github
地址 https://github.com/wayn111