時間輪演算法 時間輪是一種高效、低延遲的調度數據結構。其在Linux內核中廣泛使用,是Linux內核定時器的實現方法和基礎之一。按使用場景,大致可以分為兩種時間輪:原始時間輪和分層時間輪。分層時間輪是原始時間輪的升級版本,來應對時間“槽”數量比較大的情況,對記憶體和精度都有很高要求的情況。延遲任務的場景 ...
時間輪演算法
時間輪是一種高效、低延遲的調度數據結構。其在Linux內核中廣泛使用,是Linux內核定時器的實現方法和基礎之一。按使用場景,大致可以分為兩種時間輪:原始時間輪和分層時間輪。分層時間輪是原始時間輪的升級版本,來應對時間“槽”數量比較大的情況,對記憶體和精度都有很高要求的情況。延遲任務的場景一般只需要用到原始時間輪就可以了。
代碼案例
推薦使用Netty
提供的HashedWheelTimer
工具類來實現延遲任務。
引入依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.23.Final</version>
</dependency>
紅包過期隊列信息:
/**
* 紅包過期隊列信息
*/
public class RedPacketTimerTask implements TimerTask {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 紅包 ID
*/
private final long redPacketId;
/**
* 創建時間戳
*/
private final long timestamp;
public RedPacketTimerTask(long redPacketId) {
this.redPacketId = redPacketId;
this.timestamp = System.currentTimeMillis();
}
@Override
public void run(Timeout timeout) {
//非同步處理任務
System.out.println(String.format("任務執行時間:%s,紅包創建時間:%s,紅包ID:%s",
LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), redPacketId));
}
}
測試用例:
/**
* 基於 netty 的時間輪演算法 HashedWheelTimer 實現的延遲任務
*/
public class RedPacketHashedWheelTimer {
private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public static void main(String[] args) throws Exception {
ThreadFactory factory = r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("RedPacketHashedWheelTimerWorker");
return thread;
};
/**
* @param tickDuration - 每tick一次的時間間隔
* @param unit - tickDuration 的時間單位
* @param ticksPerWheel - 時間輪中的槽數
* @param leakDetection - 檢查記憶體溢出
*/
Timer timer = new HashedWheelTimer(factory, 1,
TimeUnit.SECONDS, 100,true);
System.out.println(String.format("開始任務時間:%s",LocalDateTime.now().format(F)));
for(int i=1;i<10;i++){
TimerTask timerTask = new RedPacketTimerTask(i);
timer.newTimeout(timerTask, i, TimeUnit.SECONDS);
}
Thread.sleep(Integer.MAX_VALUE);
}
}
列印任務執行日誌:
開始任務時間:2020-02-12 15:22:23.404
任務執行時間:2020-02-12 15:22:25.410,紅包創建時間:2020-02-12 15:22:23.409,紅包ID:1
任務執行時間:2020-02-12 15:22:26.411,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:2
任務執行時間:2020-02-12 15:22:27.424,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:3
任務執行時間:2020-02-12 15:22:28.410,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:4
任務執行時間:2020-02-12 15:22:29.411,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:5
任務執行時間:2020-02-12 15:22:30.409,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:6
任務執行時間:2020-02-12 15:22:31.411,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:7
任務執行時間:2020-02-12 15:22:32.409,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:8
任務執行時間:2020-02-12 15:22:33.411,紅包創建時間:2020-02-12 15:22:23.414,紅包ID:9
源碼相關
其核心是workerThread
線程,主要負責每過tickDuration
時間就累加一次tick
。同時也負責執行到期的timeout
任務以及添加timeout
任務到指定的wheel
中。
構造方法:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
//這裡-爪窪筆記
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
新增任務,創建即啟動:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//這裡-爪窪筆記
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
線程啟動:
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
執行相關操作:
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
小結
以上方案並沒有實現持久化和分散式,生產環境可根據實際業務需求選擇使用。
源碼
https://gitee.com/52itstyle/spring-boot-seckill