1. execute 和 submit 的區別 前面說了還需要介紹多線程中使用 execute 和 submit 的區別(這兩個方法都是線程池 ThreadPoolExecutor 的方法)。 1.1 方法來源不同 execute 方法是線程池的頂層介面 Executor 定義的,在 ThreadP ...
1. execute
和 submit
的區別
前面說了還需要介紹多線程中使用 execute
和 submit
的區別(這兩個方法都是線程池 ThreadPoolExecutor
的方法)。
1.1 方法來源不同
execute
方法是線程池的頂層介面 Executor
定義的,在 ThreadPoolExecutor
中實現:
void execute(Runnable command);
submit()
是在ExecutorService
介面中定義的,並定義了三種重載方式:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
在AbstractExecutorService
類中有它們的具體實現,而ThreadPoolExecutor
繼承了AbstractExecutorService
類。所以 ThreadPoolExecutor
也有這三個方法。
1.2 接收參數不同
從上面的方法來源中可以看出,二者接收參數類型不同:
execute()
方法只能接收實現Runnable
介面類型的任務submit()
方法則既可以接收Runnable
類型的任務,也可以接收Callable
類型的任務
1.3 返回值不同
由於 Runnable
和 Callable
的區別就是,Runnable
無返回值,Callable
有返回值。
所以 execute
和 submit
的返回值也不同。
-
execute()
的返回值是void
,線程提交後不能得到線程的返回值 -
submit()
的返回值是Future
,通過Future的get()
方法可以獲取到線程執行的返回值,get()
方法是同步的,執行get()方法時,如果線程還沒執行完,會同步等待,直到線程執行完成雖然submit()方法可以提交Runnable類型的參數,但執行Future方法的get()時,線程執行完會返回null,不會有實際的返回值,這是因為Runable本來就沒有返回值
1.4 異常處理機制不同
- 用
submit
提交任務,任務內有異常也不會列印異常信息,而是調用get()
方法時,列印出任務執行異常信息 - 用
execute
提交任務時,任務內有異常會直接列印出來
後面源碼分析中會體現這個不同點!
2. execute
和 submit
源碼分析
2.1 submit
源碼分析
submit
方法是 ExecutorService
介面定義,由 AbstractExecutorService
抽象類實現,有三個重載方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看一下上面 submit
的三個重載方法,方法體很相似,都調用了一個方法 newTaskFor(...)
,那麼就來看看這個方法,可以看到它有兩個重載方法:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
解釋一下上面兩個重載方法吧:
- 第一個
newTaskFor(Runnable runnable, T value)
:可以看到它應該是將submit
方法傳進來的Runnable
轉化成了Callable
,並給一個返回值 - 第二個
newTaskFor(Callable<T> callable)
:就是submit
直接傳進了一個Callable
,包裝成FutureTask
返回。
上面代碼中可以看一下 RunnableFuture
和 FutureTask
的關係:
先看一下 RunnableFuture:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
RunnableFuture 實現了 Runnable 和 Future,它的子類就是 FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
// ...
}
到這裡就明白了吧,當 submit 傳入的參數是 Runnable 的時候,就需要 FutureTask的構造方法將 Runnable 轉化成 Callable
下麵看一下 FutureTask
的兩個構造函數:
// 傳入Runnable則是執行了一共方法,看一下這個方法,具體轉化邏輯就有了
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
// 傳入Callable直接賦值給類的成員變數
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
下麵看一下當 submit
傳入 Runnable
的時候,其實到這裡就是調用了 FutureTask(Runnable runnable, V result)
構造函數,看一下這個構造函數中將 Runable
轉化成了 Callable
,看一下 Executors.callable(runnable, result)
方法:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
看看這裡有創建了一個類,就是 RunnableAdapter,下麵再看一下這個內部類:
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
看看這個內部類,實現了 Callable 介面,併在 call 方法中 調用了 task 的 run 方法,就是相當於任務代碼直接在 call 方法中了。
這裡必須說一下線程池中的線程是怎麼執行的,這裡就不說全部了,直說與這裡相關的一部分:
- 看到上面
submit
方法最終也是調用了execute
方法,經過上main源碼分析的一系列轉換,submit
最終調用了ThreadPoolExecutor
的execute
方法 execute
方法裡面有一個很關鍵的方法是addWorker(command, true)
- 進入
addWorker
方法,可以看到裡面 new 了一個Worker
。Worker
裡面有一個屬性是Thread
,後面直接調用了它的start
方法啟動了線程 - 可以看一下
Worker
類,它實現了Runnable
,這裡就要看看Worker
的run
方法了,調用了runWorker
- 在
runWorker
方法中,有一行是task.run()
,調用submit
時最終這個run方法就是RunnableFuture
中的run()
方法。具體實現在FutureTask
中
下麵就看一下 FutureTask
中實現的 run
方法:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//從這裡可以看到,調用了call()方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 看看,這裡是將異常放在了一個屬性中,所以 submit執行的時候不會拋出異常,只有在調用 get 方法時才會拋出異常
setException(ex);
}
if (ran)
//這裡將返回值設置到了outcome,執行完後可以通過get()獲取
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
看一下上面兩個主要的方法(setException(ex)
和 set(result)
):
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
可以看到兩個方法一個是將異常放進了 outcome ,一個是將 call
方法的返回值放進了 outcome
。不管是異常還是線程執行的返回值,都會在get
方法中獲取到,下麵看一下get
方法,方法在 FutureTask
類中:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
看看get
方法調用了 report
方法,取到了上面setException(ex)
和 set(result)
方法 放進 outcome
的值並返回。
這裡如果線程拋出了異常,這個線程會被從線程哈希表中移除,取消強引用,讓 GC
回收,並且重新創建一個新的線程。
到這裡 submit
方法的源碼就分析完了!!!
2.2 execute
源碼分析
此方法是線程頂層介面 Executor
定義的,在 ThreadPoolExecutor
有其實現,直接看實現:
其實 submit
方法最終調用了 execute
也是這一段,不同的是最後調用的線程的 run
方法是不同實現類實現的
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
這裡主要就是看 submit
和 execute
的區別點,所以線程池的看具體源碼就不看了,我之前寫的有一篇線程池源碼的筆記很詳細:線程池源碼
上面有個很重要的方法,是將線程加入隊列並執行的,就是 addWorker
方法,這裡就不copy addWorker
的代碼了,只需要知道裡面創建了一個 Worker
對象即可。Worker
對象中有一個屬性是 Thread
,後面獲取到了這個 Thread
,並執行了 start
方法。
然而在 Worker
本身是實現了 Runnable
的,所以後面執行的 start
方法,實際是執行了 Worker
中的 run
方法:
public void run() {
runWorker(this);
}
看看 Worker
中run
方法調用的 runWorker
方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 這個地方是重點,run 方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其實這裡的源碼就和 submit
一樣了,只是上面 task.run()
調用的是不同實現類的 run
方法。
execute
方法傳進來的最終是調用的Runnable
或其子類的run
方法submit
方法進來的最終是調用了FutureTask
的run
方法
基於上面的區別,再去看 FutureTask
中 run
方法的源碼,就可以知道一下結論:
execute
是沒返回值的,submit
有返回值- 用
execute
提交任務時,任務內有異常會直接列印出來;用submit
提交任務,任務內有異常也不會列印異常信息,而是調用get()
方法時,列印出任務執行異常信息