Fork/Join框架簡介 Fork/Join框架簡介 Fork/Join它可以將一個大的任務拆分成多個子任務並行處理,最後將子任務結果合成並最後的計算結果,併進行輸出。FOrk/Join框架要完成兩件事情。Fork:把一個複雜的任務進行分析,大任務拆分成小任務;Join:把拆分的結果進行合併。 1 ...
Fork/Join框架簡介
Fork/Join框架簡介
Fork/Join它可以將一個大的任務拆分成多個子任務並行處理,最後將子任務結果合成並最後的計算結果,併進行輸出。FOrk/Join框架要完成兩件事情。Fork:把一個複雜的任務進行分析,大任務拆分成小任務;Join:把拆分的結果進行合併。
1.任務分割,Fork/Join框架需要把大的任務分割成足夠小的任務,如果子任務比較大的話還要對子任務進行繼續分割。
2.執行任務併合並結果,分割的子任務分別放到雙端隊列里,然後幾個啟動線程,分別從雙端隊列里獲取任務執行。子任務執行完後的結果都放在另一個隊列里,啟動一個線程從隊列里取數據,然後合併這些數據。
雙端隊列
雙端隊列(Double-ended Queue),簡稱Deque,是一種具有隊列和棧特性的數據結構。雙端隊列允許從兩端插入和刪除元素,因此可以在隊列的頭部和尾部進行插入和和刪除操作。
雙端隊列可以在隊列的兩端執行以下操作:
- 在隊列頭部插入/刪除元素;
- 在隊列尾部插入/刪除元素;
雙端隊列的特點是可以在隊列的兩端進行插入和刪除操作,這使得它非常靈活和方便。它可以用於模擬棧,隊列和其他數據結構,同時也可以用於解決一些特定的問題,如滑動視窗問題等。在Java中,Deque介面是雙端隊列的標準實現,它有兩個主要的實現類:ArrayDeque,LinkedList。
Fork/Join框架實現任務分割和合併
- 1.ForkJoinaTask:要使用Fork/Join框架,首先要創建一個ForkJoin任務,該類提供了在任務中執行fork/join的機制。通常情況下,我們不使用ForkJoin的子類ForkJoinTask,只需要繼承ForkJoinTask子類,然後使用,Fork/join框架提供了兩個子類,RecursiveAction 用以沒有返回結果的任務,RecursiveTask 用以有返回結果的任務。
- 2.ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。
- 3.RecursiveTask:繼承此類後,可以自定義調用的任務
ForkJoinPool架構圖
RecursiveTask架構圖
Fork/Join實現原理
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放任務,以及將這些任務提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務。
Fork方法的實現原理
當我們調用ForkJoinTask的fork方法時,程式會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,非同步地執行這個任務,然後立即返回結果。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
push方法把當前任務存放在ForkJoinTask數組隊列里。然後再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程執行任務。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);//執行任務
}
else if (n >= m)
growArray();
}
}
Join方法
Join方法的主要作用是阻塞當前線程並等待獲取結果。
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)//執行doJoin() 根據返回狀態判斷返回結果
reportException(s);
return getRawResult();
}
任務狀態有4種 已完成(NORMAL),被取消(CANCELLED),信號(SIGNAL)和出現異常(EXCEPTIONAL)
doJoin方法
/**
Implementation for join, get, quietlyJoin. Directly handles only cases of already-completed, external wait, * and unfork+exec. Others are relayed to ForkJoinPool.awaitJoin.
* Returns: status upon completion
*/
/*
流程
1.當任務狀態,已經執行完成,就直接返回任務狀態
2.如果沒有執行完,則從任務數組裡取出任務並執行。
3.如果任務順利執行完成,則設置任務狀態未NORMAL,出現異常,則記錄異常,並將任務狀態設置為EXCEPTIONAL
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
/**
* Primary execution method for stolen tasks. Unless done, calls exec and records status if completed, but * doesn't wait for completion otherwise.
* Returns: status on exit from this method
*/
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
Fork/Join框架的異常處理
ForkJoinTask在執行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,並且可以通過ForkJoinTask的getException獲取異常。
public final Throwable getException() {//任務沒有完成,或者沒有拋出異常返回null
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() ://任務取消,返回CancellationException
getThrowableException());
}
案例計算1+..+100
class MyTask extends RecursiveTask<Integer> {
public static final Integer VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (begin - end < VALUE) {//差值小於10
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
int middle = ( begin + end) / 2;
MyTask myTask1 = new MyTask(begin, middle);
MyTask myTask2 = new MyTask(middle, end);
myTask1.fork();
myTask2.fork();
result = myTask1.join() + myTask2.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyTask myTask = new MyTask(1, 100);
ForkJoinTask<Integer> joinTask = forkJoinPool.submit(myTask);
System.out.println(joinTask.get());
forkJoinPool.shutdown();
}
}
只是為了記錄自己的學習歷程,且本人水平有限,不對之處,請指正。