Java線程池的原理,主要參數的作用。ThreadPoolExecutor內部有重要的成員變數ctl,類型是AtomicInteger,低29位表示線程池中線程數,通過高3位表示線程池的運行狀態。addWorker的邏輯,runWorker的邏輯 ...
目錄
1 說明
下麵如果有貼出源碼,對應的源碼是JDK8
主要的源碼類
java.util.concurrent.ThreadPoolExecutor、
java.util.concurrent.ThreadPoolExecutor.Worker
java.util.concurrent.AbstractExecutorService
1.1類繼承圖
2 線程池的狀態
3 源碼分析
3.1完整的線程池構造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
3.2 ctl
內部有重要的成員變數ctl,類型是AtomicInteger,低29位表示線程池中線程數,通過高3位表示線程池的運行狀態
COUNT_BITS的值是29
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態的線程池會接收新任務;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的線程池不會接收新任務;
3、STOP : 1 << COUNT_BITS,即高3位為001;
4、TIDYING : 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;
5、TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated()方法已經執行完成
3.3 任務的執行
execute --> addWorker --> Thread.start --> (Thread.run) --> runTask --> getTask
3.3.1 execute(Runnable command)
大致分三個步驟
1、當前運行的線程數量是否小於corePoolSize,直接嘗試addWorker()
2、往阻塞隊列裡面放入Runnable任務
3、如果隊列已經滿了,直接嘗試addWorker()
3.3.2 addWorker(Runnable firstTask, boolean core)
1、前置判斷線程池的狀態
2、通過CAS操作讓ctl加1,表示運行線程數增加1個
3、構造一個Worker w,這裡要特別註意構造方法裡面的這行代碼,this.thread = getThreadFactory().newThread(this),可以看到構造方法內,有一個Thread對象,其使用了ThreadFactory構造了一個新的線程,並且線程的runable是worker本身。
4、執行w.thread.start(),也就是說,當該線程被運行時,Worker中的run方法會被執行
3.3.3 runWorker(Worker w)
通過迴圈調用getTask()獲取要執行的任務task
beforeExecute
task.run()
afterExecute
3.3.4 getTask()
直接貼源碼了
private Runnable getTask() {
boolean timedOut = false; // 是否最後的 poll() 超時了?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // worker是否需要被淘汰
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 這裡會讓線程的數量記錄減,後面的return null,會導致runWorker沒有獲取到數據而讓run()方法走到盡頭,最終當前線程結束
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果需要回收一部分線程,那麼超時時間keepAliveTime後拿不到就數據就繼續迴圈調用,就可以在下一次迴圈的時候進行線程結束回收了;否則一直阻塞下去
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4 任務執行,帶返回值的
直接貼源碼了
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
代碼比較簡單,把任務封裝成一個既實現Runnable, 也實現Future
5 參考資料
https://blog.csdn.net/programmer_at/article/details/79799267
https://blog.csdn.net/liuzhixiong_521/article/details/87856121