Java中ScheduleThreadPoolExecutor主要用於執行延遲任務或者按照一定的頻率執行任務。其中scheduleAtFixedRate函數是按照一定頻率執行任務,scheduleWithFixedDelay可以根據延遲一定時間再執行任務。本文將參考ScheduleThreadPoo ...
Java中ScheduleThreadPoolExecutor主要用於執行延遲任務或者按照一定的頻率執行任務。其中scheduleAtFixedRate函數是按照一定頻率執行任務,scheduleWithFixedDelay可以根據延遲一定時間再執行任務。本文將參考ScheduleThreadPoolExecutor的源碼來剖析其為什麼能夠支持延遲並按照固定頻率執行任務。
ScheduleThreadPoolExecutor之所以能夠延遲並按照一定頻率執行任務主要依靠其內部封裝的兩個內部類,ScheduledFutureTask和DelayedWorkQueue。其中ScheduledFutureTask繼承了FutureTask類,因此其可以封裝繼承了Runable或Callable介面的任務。而DelayedWorkQueue則為一個延遲隊列,其利用最小堆實現,需要首先執行的任務在堆頂,這樣每次執行任務時只需要獲取堆頂的任務即可。
ScheduledFutureTask:
ScheduledFutureTask繼承了FutureTask,因此其能夠被ScheduledExecutorService執行。下麵看一下ScheduledFutureTask的一些重要屬性:
- int heapIndex:表示改任務在DelayedWorkQueue隊列中的索引。由於DelayedWorkQueue是由最小堆構建的,為了提高查找的速度,在封裝的Task里引入該欄位,可以使查找時間複雜度降到O(1)。
- private long time:表示該任務執行的時間,在getDelay()函數中根據unit.convert(time - now(), NANOSECONDS)來獲得任務還需要多長時間才能執行。同時DelayedWorkQueue中也是根據該欄位來維護最小堆的。
- private final long period:執行重覆任務的時間。正數表示按照一定的速率執行任務,負數表示按照一定的延遲執行任務,0表示任務不重覆執行。
- RunnableScheduledFuture outerTask = this:指定該任務。
- private final long sequenceNumber:任務進入隊列的順序,保證隊列的FIFO
對於ScheduledFutureTask的方法,最主要的是compareTo和getDelay和setNextRunTime方法。
-
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
該方法主要是用來獲得任務需要執行時的延遲時間,其在DelayedWorkQueue中的offer函數中有重要的運用。
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
compareTo方法主要用於DelayQueue中最小堆的排序,其首先根據任務執行時間來判斷,如果任務執行時間相同則按照隊列的FIFO規則進行判斷。
-
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
setNextRunTIme()方法主要是為需要重覆執行的任務設置下次執行的時間,當period > 0時表示任務是按照一定速率執行的,只需要將本次執行時間加上間隔時間即可。當period < 0時表示任務是延期執行的,需要調用triggerTime來獲得下次執行時間。下麵是triggerTime函數的實現:
long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * Constrains the values of all delays in the queue to be within * Long.MAX_VALUE of each other, to avoid overflow in compareTo. * This may occur if a task is eligible to be dequeued, but has * not yet been, while some other task is added with a delay of * Long.MAX_VALUE. */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
由上面的代碼可知,對於延遲執行的任務,執行時間是當前時間加上延遲時間。而為了防止在conpareTo進行比較時數值過大,延遲時間又是根據隊列中下一個要執行的任務的執行時間來獲得。下一篇講介紹DelayQueue的詳細實現。