引入線程池的原因 通常我們需要使用線程去完成某項任務的時候都會去創建一個線程,一般都會這麼寫: 這樣操作直接且簡單,當然是沒有錯的,但是卻存在這一些問題。在應付一些線程併發不多的情況時是完全夠用的,但是如果併發的線程數量很多,就會造成系統的效率降低。主要會造成如下影響: 頻繁創建和銷毀線程占用大量不 ...
引入線程池的原因
通常我們需要使用線程去完成某項任務的時候都會去創建一個線程,一般都會這麼寫:
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// TODO
}
});
thread.start();
這樣操作直接且簡單,當然是沒有錯的,但是卻存在這一些問題。在應付一些線程併發不多的情況時是完全夠用的,但是如果併發的線程數量很多,就會造成系統的效率降低。主要會造成如下影響:
- 頻繁創建和銷毀線程占用大量不必要的系統處理時間,影響性能。
- 頻繁創建和銷毀線程容易導致 GC 的頻繁執行,造成記憶體抖動,導致移動設備出現卡頓。
- 大量的線程併發非常消耗記憶體,容易造成 OOM 問題。
- 不利於擴展,如定時執行、周期執行、線程中斷。
而解決上面問題的方法,就是引入線程池的概念。線程池使得線程可以重覆利用,執行完任務後並不會銷毀線程,而是繼續執行其他的任務。這樣可以有效的減少並控制創建線程的數量,防止併發線程過多,記憶體消耗過度,從而提高系統的性能。
同時線程池還可以很方便的控制線程的併發數量,線程的定時任務,單線程順序執行等等。
ExecutorService 介面
ExecutorService 就是一般所說的線程池介面,它繼承 Executor 介面,同時還提供了一些管理終止的方法,以及可以跟蹤一個或者多個非同步任務線程執行狀況並生成 Future 的方法。
而真正意義上的線程池是 ThreadPoolExecutor,它實現了 ExecutorService 介面,並且封裝了一系列介面使其具有線程池的特性。
線程池:ThreadPoolExecutor
查看源碼後我們發現 ThreadPoolExecutor 有四個構造方法,都是調用其中一個構造方法進行初始化操作。具體代碼如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
參數 | 作用 |
---|---|
corePoolSize |
線程池中的核心線程數量 |
maximumPoolSize |
線程池中的最大線程數量 |
keepAliveTime |
超過核心線程數量的多餘空閑線程,在超過 keepAliveTime 時間內沒有任務,則被銷毀 |
unit |
keepAliveTime 的時間單位 |
workQueue |
任務隊列,用來存儲已經提交但沒有被執行的任務,不同的線程池採取的排隊策略各不相同 |
threadFactory |
線程工廠,用來創建線程池中的線程 |
handler |
當最大線程數和任務隊列已經飽和,而導致無法接受新的任務時,用來拒絕接受任務的策略 |
五種不同功能的線程池
可以看出想要創建一個 ThreadPoolExecutor 對象並不容易,所以一般推薦使用工廠類 Executors 的工廠方法來創建線程池對象。Executors 主要提供了下麵五種不同功能的線程池:
1. 固定型線程池 newFixedThreadPool
創建一個固定線程數量的線程池。每次提交一個任務就創建一個線程,直到達到設定的線程數量,之後線程池中的線程數量不再變化。當有新任務提交時,如果有空閑的線程,則由該空閑線程處理任務,否則將任務存至任務隊列中,一旦有線程空閑下來,則按照 FIFO 的方式處理隊列中的任務。該線程池適合一些穩定的正規併發線程,多用於伺服器。
定義:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
運行實例:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("Thread: " + threadName + ", running Task" + index);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
運行結果:
02-24 08:15:20 I/System.out: Thread: pool-1-thread-1, running Task1
02-24 08:15:20 I/System.out: Thread: pool-1-thread-2, running Task2
02-24 08:15:20 I/System.out: Thread: pool-1-thread-3, running Task3
02-24 08:15:23 I/System.out: Thread: pool-1-thread-1, running Task4
02-24 08:15:23 I/System.out: Thread: pool-1-thread-2, running Task5
02-24 08:15:23 I/System.out: Thread: pool-1-thread-3, running Task6
02-24 08:15:26 I/System.out: Thread: pool-1-thread-1, running Task7
02-24 08:15:26 I/System.out: Thread: pool-1-thread-2, running Task8
02-24 08:15:26 I/System.out: Thread: pool-1-thread-3, running Task9
02-24 08:15:29 I/System.out: Thread: pool-1-thread-1, running Task10
觀察線程名發現一共只創建了 3 個線程處理任務,當所有線程都處於運行狀態時,再提交的任務則會進入等待,3 個線程處理完之前任務後會被等待隊列中的任務復用,所以觀察時間發現,每次都是 3 個任務同時運行,間隔 3 秒後再運行後面 3 個任務,任務執行的順序即提交的順序。
2. 緩存型線程池 newCachedThreadPool
創建一個可以根據實際情況調整線程數量的線程池。線程池中的線程數量不確定,當需要時會創建新的線程,當有線程空閑時復用空閑線程。通常對執行大量短暫非同步任務的程式,可以提升其效率。
當然,線程池中的線程並不會越來越多,每個線程都有一個參數用來設置保持活動的時間,一旦線程空閑時間超過了該時間,則立即銷毀該線程,預設的保持活動時間為 60 秒,我們可以在其定義中看到這個預設參數。
定義:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
運行實例:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
final int index = i;
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("Thread: " + threadName + ", running Task" + index);
try {
Thread.sleep(index * 500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
運行結果:
02-24 08:25:29 I/System.out: Thread: pool-1-thread-1, running Task1
02-24 08:25:30 I/System.out: Thread: pool-1-thread-1, running Task2
02-24 08:25:31 I/System.out: Thread: pool-1-thread-2, running Task3
02-24 08:25:32 I/System.out: Thread: pool-1-thread-1, running Task4
02-24 08:25:33 I/System.out: Thread: pool-1-thread-2, running Task5
02-24 08:25:34 I/System.out: Thread: pool-1-thread-1, running Task6
02-24 08:25:35 I/System.out: Thread: pool-1-thread-3, running Task7
02-24 08:25:36 I/System.out: Thread: pool-1-thread-2, running Task8
02-24 08:25:37 I/System.out: Thread: pool-1-thread-1, running Task9
02-24 08:25:38 I/System.out: Thread: pool-1-thread-4, running Task10
我們讓任務間隔 1 秒提交一次,並且每個任務的時間逐漸增長,從結果可以發現,每隔 1 秒就會有一個任務被執行,剛開始一個線程可以處理過來,但隨著任務時間的增長,線程池創建了新的線程來處理那些提交的任務,當有之前的線程處理完後,也會被賦予任務執行。最後一共創建了 4 個線程。
3. 單例線程 newSingleThreadExecutor
創建一個只含有一個線程的線程池。每次只能執行一個線程任務,多餘的任務會保存到一個任務隊列中,當這個線程空閑時,再按照 FIFO 的方式順序執行隊列中的任務。
定義:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
運行實例:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("Thread: " + threadName + ", running Task" + index);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
運行結果:
02-24 09:00:18 I/System.out: Thread: pool-1-thread-1, running Task1
02-24 09:00:19 I/System.out: Thread: pool-1-thread-1, running Task2
02-24 09:00:20 I/System.out: Thread: pool-1-thread-1, running Task3
02-24 09:00:21 I/System.out: Thread: pool-1-thread-1, running Task4
02-24 09:00:22 I/System.out: Thread: pool-1-thread-1, running Task5
02-24 09:00:23 I/System.out: Thread: pool-1-thread-1, running Task6
02-24 09:00:24 I/System.out: Thread: pool-1-thread-1, running Task7
02-24 09:00:25 I/System.out: Thread: pool-1-thread-1, running Task8
02-24 09:00:26 I/System.out: Thread: pool-1-thread-1, running Task9
02-24 09:00:27 I/System.out: Thread: pool-1-thread-1, running Task10
結果顯而易見,從始至終只有一個線程在執行,該線程順序執行已提交的線程。
4. 調度型線程池 newScheduledThreadPool
創建一個可指定大小的,可以調度線程根據 schedule 延遲執行,或者周期執行的線程池。
運行實例:
System.out.println("Start Task");
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
for (int i = 1; i <= 10; i++) {
final int index = i;
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("Thread: " + threadName + ", running Task" + index);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 3, TimeUnit.SECONDS);
}
運行結果:
02-24 09:47:45 I/System.out: Start Task
02-24 09:47:48 I/System.out: Thread: pool-1-thread-1, running Task1
02-24 09:47:48 I/System.out: Thread: pool-1-thread-2, running Task2
02-24 09:47:48 I/System.out: Thread: pool-1-thread-3, running Task3
02-24 09:47:50 I/System.out: Thread: pool-1-thread-1, running Task5
02-24 09:47:50 I/System.out: Thread: pool-1-thread-2, running Task4
02-24 09:47:50 I/System.out: Thread: pool-1-thread-3, running Task6
02-24 09:47:52 I/System.out: Thread: pool-1-thread-1, running Task7
02-24 09:47:52 I/System.out: Thread: pool-1-thread-3, running Task8
02-24 09:47:52 I/System.out: Thread: pool-1-thread-2, running Task9
02-24 09:47:54 I/System.out: Thread: pool-1-thread-1, running Task10
運行情況基本和 newFixedThreadPool 類似,但是在開始運行時,有 3 秒鐘的延遲。
5. 調度型單例線程 newSingleThreadScheduledExecutor
創建一個只含有一個線程,可以調度線程根據 schedule 延遲執行,或者周期執行的線程池。
運行實例:
System.out.println("Start Task");
ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
singleThreadScheduledExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println("Thread: " + threadName + ", running Task1");
}
}, 1, 2, TimeUnit.SECONDS);
運行結果:
02-24 10:01:36 I/System.out: Start Task
02-24 10:01:37 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:01:39 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:01:41 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:01:43 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:01:45 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:01:47 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:01:49 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:02:51 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:02:53 I/System.out: Thread: pool-1-thread-1, running Task
02-24 10:02:55 I/System.out: Thread: pool-1-thread-1, running Task
運行情況基本和 newSingleThreadExecutor 類似,但是在開始運行時,有 1 秒鐘的延遲,並且每隔 2 秒周期性的執行一次任務。
自定義線程池
如果仔細觀察上面五種線程池的定義就會有所發現,其實線程池的功能不同,和其內部的 BlockingQueue 不同有關。如果我們想要實現一些不同功能的自定義線程池,可以從其中的 BlockingQueue 著手。
比如現在有一種 BlockingQueue 的實現類是 PriorityBlockingQueue,他可以實現隊列按照優先順序排序,那我們就可以利用它實現一個按任務的優先順序來處理任務的線程池。
首先創建一個實現了 Runnable 與 Comparable 介面的抽象類 PriorityRunnable,用來存放任務。實現了 Comparable 介面就可以進行優先順序的比較。
public abstract class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private int mPriority;
public PriorityRunnable(int priority) {
if (priority < 0 ) {
throw new IllegalArgumentException();
}
mPriority = priority;
}
@Override
public int compareTo(PriorityRunnable runnable) {
int otherPriority = runnable.getPriority();
if (mPriority < otherPriority) {
return 1;
} else if (mPriority > otherPriority) {
return -1;
} else {
return 0;
}
}
public int getPriority() {
return mPriority;
}
}
接著就可以創建一個基於 PriorityBlockingQueue 實現的線程池,其核心數量定義為 3,方便測試結果。然後創建 10 個優先順序依次增加的 PriorityRunnable 任務,提交給線程池執行。
ExecutorService threadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
for (int i = 1; i <= 10; i++) {
final int priority = i;
threadPool.execute(new PriorityRunnable(priority) {
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println("Thread: " + name + ", running PriorityTask" + priority);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
運行結果:
02-25 11:34:45 I/System.out: Thread: pool-1-thread-1, running PriorityTask1
02-25 11:34:45 I/System.out: Thread: pool-1-thread-2, running PriorityTask2
02-25 11:34:45 I/System.out: Thread: pool-1-thread-3, running PriorityTask3
02-25 11:34:47 I/System.out: Thread: pool-1-thread-1, running PriorityTask10
02-25 11:34:47 I/System.out: Thread: pool-1-thread-2, running PriorityTask9
02-25 11:34:47 I/System.out: Thread: pool-1-thread-3, running PriorityTask8
02-25 11:34:49 I/System.out: Thread: pool-1-thread-1, running PriorityTask7
02-25 11:34:49 I/System.out: Thread: pool-1-thread-2, running PriorityTask6
02-25 11:34:49 I/System.out: Thread: pool-1-thread-3, running PriorityTask5
02-25 11:34:51 I/System.out: Thread: pool-1-thread-1, running PriorityTask4
可以看到,運行結果和 newFixedThreadPool 非常相似,唯一不同就在於等待任務不是按照 FIFO 的方式執行,而是優先順序較高的任務先執行。
線程池的優化
線程池可以自定義內部線程的數量,定義的數量影響著系統的性能表現,如果定義過大會浪費資源,定義過小線程併發量太低,處理速度也變低了,所以合理的設置線程數量是很重要的。通常需要考慮 CPU 的核心數、記憶體的大小、併發請求的數量等因素。
通常核心線程數量可以設為 CPU 核心數 + 1,最大線程數量可以設為 CPU 核心數 * 2 + 1。
獲取 CPU 核心數的方法為:
Runtime.getRuntime().availableProcessors();
線程池的調用方式
以下方法都可以調用線程池執行,但是效果不同,可以參考文檔或源碼瞭解:
void execute(Runnable command);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
線程池的關閉
void shutdown();
該方法不再接受新的任務提交,在終止線程池之前,允許執行完以前提交的任務。
List<Runnable> shutdownNow();
該方法不再接受新的任務提交,並且把任務隊列中的任務直接移除,嘗試停止正在執行的任務。返回等待的任務列表。