一、ThreadPoolExecutor類講解 1、線程池狀態: 五種狀態: 線程池的shutdown() 方法,將線程池由 RUNNING(運行狀態)轉換為 SHUTDOWN狀態 線程池的shutdownNow()方法,將線程池由RUNNING 或 SHUTDOWN 狀態轉換為 STOP 狀態。 ...
一、ThreadPoolExecutor類講解
1、線程池狀態:
五種狀態:
- 線程池的
shutdown()
方法,將線程池由 RUNNING(運行狀態)轉換為 SHUTDOWN狀態 - 線程池的
shutdownNow()
方法,將線程池由RUNNING 或 SHUTDOWN 狀態轉換為 STOP 狀態。
註:
SHUTDOWN
狀態 和 STOP 狀態 先會轉變為TIDYING
狀態,最終都會變為TERMINATED
2、ThreadPoolExecutor構造函數:
ThreadPoolExecutor
繼承自AbstractExecutorService
,而AbstractExecutorService
實現了ExecutorService
介面。
接下來我們分別講解這些參數的含義。
2.1)線程池工作原理:
corePoolSize
:線程池中核心線程數的最大值maximumPoolSize
:線程池中能擁有最多線程數workQueue
:用於緩存任務的阻塞隊列
當調用線程池execute()
方法添加一個任務時,線程池會做如下判斷:
- 如果有空閑線程,則直接執行該任務;
- 如果沒有空閑線程,且當前運行的線程數少於
corePoolSize
,則創建新的線程執行該任務; - 如果沒有空閑線程,且當前的線程數等於
corePoolSize
,同時阻塞隊列未滿,則將任務入隊列,而不添加新的線程; - 如果沒有空閑線程,且阻塞隊列已滿,同時池中的線程數小於
maximumPoolSize
,則創建新的線程執行任務; - 如果沒有空閑線程,且阻塞隊列已滿,同時池中的線程數等於
maximumPoolSize
,則根據構造函數中的 handler 指定的策略來拒絕新的任務。
2.2)KeepAliveTime:
keepAliveTime
:表示空閑線程的存活時間TimeUnit unit
:表示keepAliveTime的單位
當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize
,那麼這個線程就被停掉。所以線程池的所有任務完成後,它最終會收縮到 corePoolSize
的大小。
註:如果線程池設置了
allowCoreThreadTimeout
參數為true(預設false),那麼當空閑線程超過keepaliveTime
後直接停掉。(不會判斷線程數是否大於corePoolSize
)即:最終線程數會變為0。
2.3)workQueue 任務隊列:
workQueue
:它決定了緩存任務的排隊策略ThreadPoolExecutor
線程池推薦了三種等待隊列,它們是:SynchronousQueue
、LinkedBlockingQueue
和ArrayBlockingQueue
。
1)有界隊列:
SynchronousQueue
:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於 阻塞狀態,吞吐量通常要高於LinkedBlockingQueue
,靜態工廠方法Executors.newCachedThreadPool
使用了這個隊列。ArrayBlockingQueue
:一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。
2)無界隊列:
LinkedBlockingQueue
:基於鏈表結構的無界阻塞隊列,它可以指定容量也可以不指定容量(實際上任何無限容量的隊列/棧都是有容量的,這個容量就是Integer.MAX_VALUE
)PriorityBlockingQueue
:是一個按照優先順序進行內部元素排序的無界阻塞隊列。隊列中的元素必須實現 Comparable 介面,這樣才能通過實現compareTo()
方法進行排序。優先順序最高的元素將始終排在隊列的頭部;PriorityBlockingQueue
不會保證優先順序一樣的元素的排序。
註意:
keepAliveTime
和maximumPoolSize
及BlockingQueue
的類型均有關係。如果BlockingQueue
是無界的,那麼永遠不會觸發maximumPoolSize
,自然keepAliveTime
也就沒有了意義。
2.4)threadFactory:
threadFactory
:指定創建線程的工廠。(可以不指定)
如果不指定線程工廠時,ThreadPoolExecutor
會使用ThreadPoolExecutor.defaultThreadFactory
創建線程。預設工廠創建的線程:同屬於相同的線程組,具有同為 Thread.NORM_PRIORITY
的優先順序,以及名為 “pool-XXX-thread-
” 的線程名(XXX為創建線程時順序序號),且創建的線程都是非守護進程。
2.5)handler 拒絕策略:
handler
:表示當 workQueue
已滿,且池中的線程數達到 maximumPoolSize
時,線程池拒絕添加新任務時採取的策略。(可以不指定)
最科學的的還是 AbortPolicy 提供的處理方式:拋出異常,由開發人員進行處理。
3、常用方法:
除了在創建線程池時指定上述參數的值外,還可線上程池創建以後通過如下方法進行設置。
此外,還有一些方法:
getCorePoolSize()
:返回線程池的核心線程數,這個值是一直不變的,返回在構造函數中設置的coreSize大小;getMaximumPoolSize()
:返回線程池的最大線程數,這個值是一直不變的,返回在構造函數中設置的coreSize大小;getLargestPoolSize()
:記錄了曾經出現的最大線程個數(水位線);getPoolSize()
:線程池中當前線程的數量;getActiveCount()
:Returns the approximate(近似) number of threads that are actively executing tasks;prestartAllCoreThreads()
:會啟動所有核心線程,無論是否有待執行的任務,線程池都會創建新的線程,直到池中線程數量達到 corePoolSize;prestartCoreThread()
:會啟動一個核心線程(同上);allowCoreThreadTimeOut(true)
:允許核心線程在KeepAliveTime時間後,退出;
4、Executors類:
Executors類的底層實現便是ThreadPoolExecutor!Executors 工廠方法有:
Executors.newCachedThreadPool()
:無界線程池,可以進行自動線程回收Executors.newFixedThreadPool(int)
:固定大小線程池Executors.newSingleThreadExecutor()
:單個後臺線程
它們均為大多數使用場景預定義了設置。不過在阿裡java文檔中說明,儘量不要用該類創建線程池。
二、線程池相關介面介紹:
1、ExecutorService介面:
該介面是真正的線程池介面。上面的ThreadPoolExecutor
以及下麵的ScheduledThreadPoolExecutor
都是該介面的實現類。改介面常用方法:
Future<?> submit(Runnable task)
:提交Runnable任務到線程池,返回Future對象,由於Runnable沒有返回值,也就是說調用Future對象get()方法返回null;<T> Future<T> submit(Callable<T> task)
:提交Callable任務到線程池,返回Future對象,調用Future對象get()方法可以獲取Callable的返回值;<T> Future<T> submit(Runnable task,T result)
:提交Runnable任務到線程池,返回Future對象,調用Future對象get()方法可以獲取Runnable的參數值;invokeAll(collection of tasks)/invokeAll(collection of tasks, long timeout, TimeUnit unit)
:invokeAll會按照任務集合中的順序將所有的Future添加到返回的集合中,該方法是一個阻塞的方法。只有當所有的任務都執行完畢時,或者調用線程被中斷,又或者超出指定時限時,invokeAll方法才會返回。當invokeAll返回之後每個任務要麼返回,要麼取消,此時客戶端可以調用get/isCancelled來判斷具體是什麼情況。invokeAny(collection of tasks)/invokeAny(collection of tasks, long timeout, TimeUnit unit)
:阻塞的方法,不會返回 Future 對象,而是返回集合中某一個Callable 對象的結果,而且無法保證調用之後返回的結果是哪一個 Callable,如果一個任務運行完畢或者拋出異常,方法會取消其它的 Callable 的執行。和invokeAll區別是只要有一個任務執行完了,就把結果返回,並取消其他未執行完的任務;同樣,也帶有超時功能;shutdown()
:在完成已提交的任務後關閉服務,不再接受新任;shutdownNow()
:停止所有正在執行的任務並關閉服務;isTerminated()
:測試是否所有任務都執行完畢了;isShutdown()
:測試是否該ExecutorService已被關閉。
1.1)submit方法示例:
我們知道,線程池介面中有以下三個主要方法,接下來我們看一下具體示例:
1)Callable:
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(50),
new ThreadFactory(){ public Thread newThread(Runnable r) {
return new Thread(r, "schema_task_pool_" + r.hashCode());
}}, new ThreadPoolExecutor.DiscardOldestPolicy());
public static void callableTest() {
int a = 1;
//callable
Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
int b = a + 100;
System.out.println(b);
return true;
}
});
try {
System.out.println("feature.get");
Boolean boolean1 = future.get();
System.out.println(boolean1);
} catch (InterruptedException e) {
System.out.println("InterruptedException...");
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
}
}
2)Runnable:
public static void runnableTest() {
int a = 1;
//runnable
Future<?> future1 = threadPool.submit(new Runnable(){
@Override
public void run() {
int b = a + 100;
System.out.println(b);
}
});
try {
System.out.println("feature.get");
Object x = future1.get(900,TimeUnit.MILLISECONDS);
System.out.println(x);//null
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
3)Runnable+result:
class RunnableTask implements Runnable {
Person p;
RunnableTask(Person p) {
this.p = p;
}
@Override
public void run() {
p.setId(1);
p.setName("Runnable Task...");
}
}
class Person {
private Integer id;
private String name;
public Person(Integer id, String name) {
super();
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + "]";
}
}
public static void runnableTest2() {
//runnable + result
Person p = new Person(0,"person");
Future<Person> future2 = threadPool.submit(new RunnableTask(p),p);
try {
System.out.println("feature.get");
Person person = future2.get();
System.out.println(person);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
1.2)線程池執行時,Callable的call方法(Runnable的run方法)拋出異常後,會出現什麼?
在上面的例子中我們可以看到,線程池無論是執行Callable
還是Runnable
,調用返回的Future對象get()
方法時需要處理兩種異常(如果是調用get(timeout)
方法,需要處理三種異常),如下:
//線上程池上運行
Future<Object> future = threadPool.submit(callable);
try {
System.out.println("feature.get");
Object x = future.get(900,TimeUnit.MILLISECONDS);
System.out.println(x);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execute exception...");
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
- 如果get方法被打斷,進入
InterruptedException
異常; - 如果線程執行過程(call、run方法)中拋出異常,進入
ExecutionException
異常; - 如果get方法超時,進入
TimeoutException
異常;
1.3)submit()和execute()方法區別:
ExecutorService
、ScheduledExecutorService
介面的submit()
和execute()
方法都是把任務提交到線程池中,但二者的區別是
- 接收的參數不一樣,
execute
只能接收Runnable
類型、submit
可以接收Runnable
和Callable
兩種類型; submit
有返回值,而execute
沒有返回值;submit
方便Exception
處理;
1)submit方法內部實現:
其實submit
方法也沒有什麼神秘的,就是將我們的任務封裝成了RunnableFuture
介面(繼承了Runnable、Future介面),再調用execute
方法,我們看源碼:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); //轉成 RunnableFuture,傳的result是null
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
2)newTaskFor方法內部實現:
newTaskFor
方法是new了一個FutureTask
返回,所以三個方法其實都是把task轉成FutureTask
,如果task是Callable
,就直接賦值,如果是Runnable
就轉為Callable
再賦值。
當submit
參數是Callable
時:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
當submit
參數是Runnable
時:
// 按順序看,層層調用
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result); //轉 runnable 為 callable
this.state = NEW;
}
// 以下為Executors中的方法
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> { //適配器
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
看了源碼就揭開了神秘面紗了,就是因為Future
需要返回結果,所以內部task必須是Callable
,如果task是Runnable
就偷天換日,在Runnable
外麵包個Callable
馬甲,返回的結果在構造時就寫好。
參考:https://blog.csdn.net/liuxiao723846/article/details/108024212
1.4)ScheduledExecutorService介面:
繼承ExecutorService
,並且提供了按時間安排執行任務的功能,它提供的方法主要有:
schedule(task, initDelay)
: 安排所提交的Callable或Runnable任務在initDelay指定的時間後執行;scheduleAtFixedRate()
:安排所提交的Runnable任務按指定的間隔重覆執行;scheduleWithFixedDelay()
:安排所提交的Runnable任務在每次執行完後,等待delay所指定的時間後重覆執行;
註:該介面的實現類是
ScheduledThreadPoolExecutor
。
2、Callable介面:
jdk1.5以後創建線程可以通過一下方式:
- 繼承
Thread
類,實現void run()
方法; - 實現
Runnable
介面,實現void run()
方法; - 實現
Callable
介面,實現V call() Throws Exception
方法
1)Callable和Runnale介面區別:
Callable
可以拋出異常,和Future
、FutureTask
配合可以用來獲取非同步執行的結果;Runnable
沒有返回結果,異常只能內部消化;
2)執行Callable的線程的方法可以通過以下兩種方式:
- 藉助
FutureTask
,使用Thread
的start
方法來執行; - 加入到線程池中,使用線程池的
execute
或submit
執行;
註:
Callable
無法直接使用Thread來執行;
我們都知道,Callable
帶有返回值的,如果我們不需要返回值,卻又想用Callable
該如何做?
jdk中有個Void類型(大寫V),但必須也要return null
。
threadpool.submit(new Callable<Void>() {
@Override
public Void call() {
//...
return null;
}
});
3)通過Executors工具類可以把Runnable介面轉換成Callable介面:
Executors
中的callable
方法可以將Runnable
轉成Callable
,如下:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
RunnableAdapter
類在上面已經看過源碼,原理就是將返回值result作為成員變數,通過參數傳遞進去,進而實現了Runnable
可以返回值。
示例:
public static void test5() {
Person p = new Person(0,"person");
RunnableTask runnableTask = new RunnableTask(p);//創建runnable
Callable<Person> callable = Executors.callable(runnableTask,p);//轉換
Future<Person> future1 = threadPool.submit(callable);//線上程池上執行Callable
try {
Person person = future1.get();
System.out.println(person);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Runnable runnable = new Runnable() {//創建Runnable
@Override
public void run() {
}
};
Callable<Object> callable2 = Executors.callable(runnable);//轉換
Future<Object> future2 = threadPool.submit(callable2);//線上程池上執行Callable
try {
Object o = future2.get();
System.out.println(o);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
3、Future介面:
3.1)Future是用來獲取非同步計算結果的介面,常用方法:
boolean cancel(boolean mayInterruptIfRunning)
:試圖取消對此任務的執行。如果任務已完成、或已取消,或者由於某些其他原因而無法取消,則此嘗試將失敗。當調用 cancel 時,如果調用成功,而此任務尚未啟動,則此任務將永不運行。如果任務已經啟動,則mayInterruptIfRunning
參數確定是否應該以試圖停止任務的方式來中斷執行此任務的線程。此方法返回後,對isDone()
的後續調用將始終返回 true。如果此方法返回 true,則對isCancelled()
的後續調用將始終返回 true。boolean isCancelled()
:如果在任務正常完成前將其取消,則返回 true。boolean isDone()
:如果任務已完成,則返回 true,可能由於正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true。V get()throws InterruptedException,ExecutionException
:獲取非同步結果,此方法會一直阻塞等到計算完成;V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException
:獲取非同步結果,此方法會在指定時間內一直阻塞等到計算完成,超時後會拋出超時異常。
通過方法分析我們也知道實際上Future提供了3種功能:
- 能夠中斷執行中的任務;
- 判斷任務是否執行完成;
- 獲取任務執行完成後額結果。
但是Future只是一個介面,我們無法直接創建對象,因此就需要其實現類FutureTask登場啦。
3.2)FutureTask類:
1)FutureTask類的實現:
public class FutureTask<V> implements RunnableFuture<V> {
//...
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
FutureTask
實現了Runnable
、Future
兩個介面。由於FutureTask
實現了Runnable
,因此它既可以通過Thread
包裝來直接執行,也可以提交給ExecuteService
來執行。並且還可以直接通過get()
函數獲取執行結果,該函數會阻塞,直到結果返回。
因此FutureTask
既是Future
、Runnable
,又是包裝了Callable
( 如果是Runnable最終也會被轉換為Callable ), 它是這兩者的合體。
2)FutureTask的構造函數:
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
3.3)示例:(FutureTask兩種構造函數、以及在Thread和線程池上運行)
1)FutureTask包裝過的Callable在Thread、線程池上執行:
public static void test3() {
int a = 1,b = 2;
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return a + b;
}
};
//通過futureTask來執行Callable
FutureTask<Integer> futureTask = new FutureTask<>(callable);
//1.使用Thread執行線程
new Thread(futureTask).start();
try {
Integer integer = futureTask.get();
System.out.println(integer);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//2.使用線程池執行線程
Executors.newFixedThreadPool(1).submit(futureTask);
threadPool.shutdown();
try {
Integer integer = futureTask.get();
System.out.println(integer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
2)FutureTask包裝過的Runnable在Thread、線程池上執行:
public static void test4() {
Person p = new Person(0,"person");
RunnableTask runnableTask = new RunnableTask(p);
//創建futureTask來執行Runnable
FutureTask<Person> futureTask = new FutureTask<>(runnableTask,p);
//1.使用Thread執行線程
new Thread(futureTask).start();
try {
Person x = futureTask.get();
System.out.println(x);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//2.使用線程池執行線程
threadPool.submit(futureTask);
threadPool.shutdown();
try {
Person y = futureTask.get();
System.out.println(y);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
Person、RunnableTask類同上面的示例中。
來源:https://blog.csdn.net/liuxiao723846
更多文章推薦:
2.2,000+ 道 Java面試題及答案整理(2024最新版)
3.免費獲取 IDEA 激活碼的 7 種方式(2024最新版)
覺得不錯,別忘了隨手點贊+轉發哦!