【1】為什麼要使用線程池? 示例演示: //設置業務模擬 class MyRunnable implements Runnable { private int count; public MyRunnable(int count) { this.count = count; } public int ...
【1】為什麼要使用線程池?
示例演示:
//設置業務模擬 class MyRunnable implements Runnable { private int count; public MyRunnable(int count) { this.count = count; } public int getCount() { return count; } @Override public void run() { for (int i = 0; i < 100000; i++) { count += i; } System.out.println("結果:"+count); } } //模擬線程池復用線程執行業務 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; ExecutorService executorService = Executors.newSingleThreadExecutor(); MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { executorService.execute(myRunnable); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("時間:"+(System.currentTimeMillis() - start)); } //模擬每次執行業務都開一個線程 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(myRunnable); thread.start(); thread.join(); } System.out.println("時間:" + (System.currentTimeMillis() - start)); }
示例結果:
採用每次都開一個線程的結果是292毫秒,而線程池的是69毫秒。(隨著業務次數的增多這個數值的差距會越大)
示例說明:
如果每個請求到達就創建一個新線程,開銷是相當大的。在實際使用中,伺服器在創建和銷毀線程上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多。除了創建和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。
如果併發的請求數量非常多,但每個線程執行的時間很短,這樣就會頻繁的創建和銷毀線程,如此一來會大大降低系統的效率。可能出現伺服器在為每個請求創建新線程和銷毀線程上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多。(說明瞭我們什麼時候使用線程池:1.單個任務處理時間比較短;2.需要處理的任務數量很大;)
線程池主要用來解決線程生命周期開銷問題和資源不足問題。通過對多個任務重覆使用線程,線程創建的開銷就被分攤到了多個任務上了,而且由於在請求到達時線程已經存在,所以消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使用應用程式響應更快。另外,通過適當的調整線程中的線程數目可以防止出現資源不足的情況。
【2】線程池的介紹
(1)線程池優勢
1.重用存在的線程,減少線程創建,消亡的開銷,提高性能
2.提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
3.提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
(2)常見線程池
1.newSingleThreadExecutor :單個線程的線程池,即線程池中每次只有一個線程工作,單線程串列執行任務
2.newFixedThreadExecutor(n) :固定數量的線程池,每提交一個任務就是一個線程,直到達到線程池的最大數量,然後後面進入等待隊列直到前面的任務完成才繼續執行
3.newCacheThreadExecutor(推薦使用) :可緩存線程池, 當線程池大小超過了處理任務所需的線程,那麼就會回收部分空閑(一般是60秒無執行)的線程,當有任務來時,又智能的添加新線程來執行。
4.newScheduleThreadExecutor :大小無限制的線程池,支持定時和周期性的執行線程
5.常見線程池的說明
在阿裡的開發手冊中其實不推薦我們使用預設的線程池,為什麼?
【1】Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
【2】其次newCacheThreadExecutor,沒有核心線程數,且非核心線程數是最大值,不斷創建線程容易出現CPU100%的問題。
(3)預設線程池
1.ThreadPoolExecutor
1)說明
實際上不管是newSingleThreadExecutor,newFixedThreadExecutor還是newCacheThreadExecutor,他們都是使用ThreadPoolExecutor去生成的。
只不過由於參數不同導致產生的線程池的不同,因此,我們常使用是ThreadPoolExecutor去自建自己想要的線程池。
2)參數解析
1.corePoolSize
線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等於corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到 阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。
2.maximumPoolSize
線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize;
3.keepAliveTime
線程池維護線程所允許的空閑時間。當線程池中的線程數量大於corePoolSize的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
4.unit
keepAliveTime的單位;
5.workQueue
用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable介面,在JDK中提供瞭如下阻塞隊列:
1、ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;
2、LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;
3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;
4、priorityBlockingQuene:具有優先順序的無界阻塞隊列;
6.threadFactory
它是ThreadFactory類型的變數,用來創建新線程。預設使用Executors.defaultThreadFactory() 來創建線程。使用預設的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先順序並且是非守護線程,同時也設置了線程的名稱。
7.handler
線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須採取一種策略處理該任務,線程池提供了4種策略:
1、AbortPolicy:直接拋出異常,預設策略;
2、CallerRunsPolicy:用調用者所在的線程來執行任務;
3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
4、DiscardPolicy:直接丟棄任務;
上面的4種策略都是ThreadPoolExecutor的內部類。
當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。(自定義的才是最常用的)
【3】線程池相關的類分析
1.ExecutorService介面與Executor介面
//定義了一個用於執行Runnable的execute方法 public interface Executor { void execute(Runnable command); } /** * 介面ExecutorService,其中定義了線程池的具體行為 * 1,execute(Runnable command):履行Ruannable類型的任務, * 2,submit(task):可用來提交Callable或Runnable任務,並返回代表此任務的Future 對象 * 3,shutdown():在完成已提交的任務後封閉辦事,不再接管新任務, * 4,shutdownNow():停止所有正在履行的任務並封閉辦事。 * 5,isTerminated():測試是否所有任務都履行完畢了。 * 6,isShutdown():測試是否該ExecutorService已被關閉。 */ public interface ExecutorService extends Executor { // 停止線程池 void shutdown(); // 立即停止線程池,返回尚未執行的任務列表 List<Runnable> shutdownNow(); // 線程池是否停止 boolean isShutdown(); // 線程池是否終結 boolean isTerminated(); // 等待線程池終結 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交Callable類型任務 <T> Future<T> submit(Callable<T> task); // 提交Runnable類型任務,預先知道返回值 <T> Future<T> submit(Runnable task, T result); // 提交Runnable類型任務,對返回值無感知 Future<?> submit(Runnable task); // 永久阻塞 - 提交和執行一個任務列表的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 帶超時阻塞 - 提交和執行一個任務列表的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 永久阻塞 - 提交和執行一個任務列表的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 帶超時阻塞 - 提交和執行一個任務列表的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.抽象類AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } .... }
3.ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService { ... public void execute(Runnable command) { if (command == null) int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ... }
4.ScheduledThreadPoolExecutor類
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { ... public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } ... }
5.問題點
1)execute方法與submit方法的區別?
【1】最明顯的就是 :
void execute() //提交任務無返回值
Future<?> submit() //任務執行完成後有返回值
【2】另外一個不明顯的就是隊列的提交方法(add【ScheduledThreadPoolExecutor類中使用】與offer【ThreadPoolExecutor類中使用】)
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
明顯當隊列滿了的時候,add方法會拋出異常,而offer不會。
【4】線程池的狀態分析
1.線程池存在5種狀態
1)RUNNING = ‐1 << COUNT_BITS; //高3位為111 運行狀態
2)SHUTDOWN = 0 << COUNT_BITS; //高3位為000 關閉狀態
3)STOP = 1 << COUNT_BITS; //高3位為001 停止狀態
4)TIDYING = 2 << COUNT_BITS; //高3位為010 整理狀態
5)TERMINATED = 3 << COUNT_BITS; //高3位為011 銷毀狀態
2.狀態說明
1、RUNNING
(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處於RUNNING狀態,並且線程池中的任務數為0!
2、 SHUTDOWN
(1)狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2)狀態切換:調用線程池的shutdown()介面時,線程池由RUNNING -> SHUTDOWN。
3、STOP
(1)狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
(2)狀態切換:調用線程池的shutdownNow()介面時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1)狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING 狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想線上程池變為TIDYING時,進行相應的處理; 可以通過重載terminated()函數來實現。
(2)狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5、 TERMINATED
(1)狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(2)狀態切換:線程池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED。
進入TERMINATED的條件如下:
線程池不是RUNNING狀態;
線程池狀態不是TIDYING狀態或TERMINATED狀態;
如果線程池狀態是SHUTDOWN並且workerQueue為空;
workerCount為0;
設置TIDYING狀態成功。
3.彙總
預設情況下,如果不調用關閉方法,線程池會一直處於 RUNNING 狀態,而線程池狀態的轉移有兩個路徑:當調用 shutdown() 方法時,線程池的狀態會從 RUNNING 到 SHUTDOWN,再到 TIDYING,最後到 TERMENATED 銷毀狀態;當調用 shutdownNow() 方法時,線程池的狀態會從 RUNNING 到 STOP,再到 TIDYING,最後到 TERMENATED 銷毀狀態。
4.圖示
【5】線程池的源碼解析
1.針對自定義線程池的運行分析
1)示例代碼:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));//自定義線程 for (int i = 1; i <= 100; i++) { threadPoolExecutor.execute(new MyTask(i)); }
2)示例結果:
3)示例疑問:
輸出的順序並不是預想的1-5,6-10,11-15,16-20。反而是1-5,16-20,6-10,11-15。(深入源碼查探原因)
2.針對自定義線程池ThreadPoolExecutor類的運行分析
1)ThreadPoolExecutor類重要屬性 private final AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //預設值-536870912 private static final int COUNT_BITS = Integer.SIZE - 3; //預設值29,轉為2進位11101 private static final int CAPACITY = (1 << COUNT_BITS)-1; //預設值536870911,轉為2進位11111111111111111111111111111 private static final int RUNNING = -1 << COUNT_BITS; //-536870912 private static final int SHUTDOWN = 0 << COUNT_BITS; //0 private static final int STOP = 1 << COUNT_BITS; //536870912 private static final int TIDYING = 2 << COUNT_BITS; //1073741824 private static final int TERMINATED = 3 << COUNT_BITS; //1610612736 //ctl相關方法 private static int runStateOf(int c) { return c & ~CAPACITY; } //runStateOf:獲取運行狀態;//~x=-(x+1) //預設值0 private static int workerCountOf(int c) { return c & CAPACITY; } //workerCountOf:獲取活動線程數; //預設值0,當線程數+1是值也會+1 private static int ctlOf(int rs, int wc) { return rs | wc; } //ctlOf:獲取運行狀態和活動線程數的值。//預設值-536870912 說明: ctl 是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個欄位, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),
可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。 PS: 1.&和&&的區別 相同點: 最終得到的boolean值結果一樣,都是“並且and”的意思 不同點: &既是邏輯運算符也是位運算符;&&只是邏輯運算符 &不具有短路效果,即左邊false,右邊還會執行;&&具有短路效果,左邊為false,右邊則不執行 2.| 和 || 的區別 相同點: 最終得到的boolean值結果一樣,都是“或者or”的意思 不同點: | 既是邏輯運算符也是位運算符;|| 只是邏輯運算符 | 不具有短路效果,即左邊true,右邊還會執行;|| 具有短路效果,左邊為true,右邊則不執行
2)ThreadPoolExecutor類#execute方法【這裡涉及到一個概念,提交優先順序: 核心線程>隊列>非核心線程】
展示
public void execute(Runnable command) { if (command == null) //不能提交空任務 throw new NullPointerException(); int c = ctl.get(); //獲取運行的線程數 //核心線程數不滿 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //在addWorker中創建工作線程執行任務 return; c = ctl.get(); } //線程還在運行,且核心數滿了,放入線程池隊列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//線程池是否處於運行狀態,如果不是,則剛塞入的任務要移除 reject(command); //走拒絕策略 //這一步其實沒有很大意義,除非出現線程池所有線程完蛋了,但是隊列還有任務的情況。(一般是進入時時運行態,然後遇到狀態變更的情況) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //插入隊列不成功,且當前線程數數量小於最大線程池數量,此時則創建新線程執行任務,創建失敗拋出異常 else if (!addWorker(command, false)) reject(command); //走拒絕策略 }
說明
在正常運行狀態下,線程池:核心線程執行任務-》塞入隊列-》非核心線程執行任務。
體現了在併發不激烈的情況下,儘量減少創建線程的操作,用已有的線程。而且核心線程數並不是提前創建的,而是用到的時候才會創建。而且核心線程數不滿,優先以創建線程來執行任務。
邏輯展示
3)ThreadPoolExecutor類#addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取線程池的狀態 int c = ctl.get(); int rs = runStateOf(c); //如果是非運行狀態(因為只有運行狀態是負數) if (rs >= SHUTDOWN && ! //判斷是不是關閉狀態,不接收新任務,但能處理已添加的任務 //任務是不是空任務,隊列是不是空(這一步說明瞭關閉狀態不接受任務) (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取活動線程數 int wc = workerCountOf(c); //檢驗線程數是否大於容量值【這是避免設置的非核心線程數沒有限制大小】 //根據傳入參數判斷核心線程數與非核心線程數是否達到了最大值 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //嘗試增加workerCount數量【也就是活躍線程數+1】,如果成功,則跳出第一個for迴圈 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失敗,則重新獲取ctl的值 c = ctl.get(); // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for迴圈繼續執行 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //線程啟動標誌 boolean workerAdded = false; //線程添加標誌 Worker w = null; try { //根據firstTask來創建Worker對象,每一個Worker對象都會創建一個線程 w = new Worker(firstTask); //【調用1】 final Thread t = w.thread; //如果過線程不為空,則試著將線程加入工作隊列中 if (t != null) { final ReentrantLock mainLock = this.mainLock; //加重入鎖 mainLock.lock(); try { // 重新獲取線程的狀態 int rs = runStateOf(ctl.get()); //是否線程池正處於運行狀態 if (rs < SHUTDOWN || //線程池是否處於關閉狀態 且 傳入的任務為空(說明關閉狀態還是能添加工作者,但是不允許添加任務) (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //判斷線程是否存活 throw new IllegalThreadStateException(); //workers是一個HashSet,將該worker對象添加其中 workers.add(w); //記錄線程工作者的值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //修改添加標記 workerAdded = true; } } final