基礎概念 進程(process):進程是電腦中的一個任務,比如打開瀏覽器、IntelliJ IDEA。 線程(thread):進程內部有多個子任務,叫線程。比如IDEA在敲代碼的同時還能自動保存、自動導包,都是子線程做的。 進程和線程的關係就是一個進程包含一個或多個線程。 線程是操作系統調度的最小 ...
基礎概念
- 進程(process):進程是電腦中的一個任務,比如打開瀏覽器、IntelliJ IDEA。
- 線程(thread):進程內部有多個子任務,叫線程。比如IDEA在敲代碼的同時還能自動保存、自動導包,都是子線程做的。
進程和線程的關係就是一個進程包含一個或多個線程。
線程是操作系統調度的最小任務單位。線程自己不能決定什麼時候執行,由操作系統決定什麼時候調度。因此多線程編程中,代碼的先後順序不代表代碼的執行順序。
多線程有什麼好處?
- 提高應用程式的性能。非同步編程讓程式更快的響應。
- 提高CPU利用率。一個線程阻塞,另一個線程繼續執行,充分利用CPU。
同時多線程也會帶來安全問題,比如多個線程讀寫一個共用變數,會出現數據不一致的問題。
什麼時候考慮用多線程?
- 高併發。系統在同一時間要處理多個任務時,需要用多線程。
- 很耗時的操作。如文件讀寫,非同步執行不讓進程阻塞。
- 不影響方法主流程邏輯,但又影響介面性能的操作,如數據同步,使用非同步方式能提高介面性能。
創建線程的方式
多線程的創建方法基本有四種:
- 繼承
Thread
類 - 實現
Runnalble
介面 - 實現
Callable
介面 - 線程池
1.繼承Thread類
public class ThreadTest extends Thread {
@Override
public void run() {
System.out.println("新線程開始...");
}
public static void main(String[] args) {
ThreadTest t = new ThreadTest();
t.start();
System.out.println("main線程結束...");
}
}
main線程結束...
新線程開始...
啟動一個新線程總是調用它的start()
方法,而不是run()
方法;ThreadTest
子線程啟動後,它跟main
就開始同時運行了,誰先執行誰後執行由操作系統調度。所以多線程代碼的執行順序跟代碼順序無關。
2.實現Runnable介面
實現Runnable
介面,重寫run()
方法,作為構造器參數傳給Thread
,調用start()
方法啟動線程。
public class Test {
public static void main(String[] args) {
RunnableThread r = new RunnableThread();
new Thread(r).start();
new Thread(r).start();
}
}
class RunnableThread implements Runnable {
@Override
public void run() {
System.out.println("新線程開始...");
}
}
一般推薦使用實現Runnable
的方式來創建新線程,它的優點有:
- Java中只有單繼承,介面則可以多實現。如果一個類已經有父類,它就不能再繼承
Thread
類了,繼承了Thread
類就不能再繼承其他類,有局限性。實現Runnable
介面則沒有局限性。 - 實現
Runnable
介面的類具有共用數據的特性,它可以同時作為多個線程的執行單位(target
),此時多個線程操作的是同一個對象的run
方法,這個對象所有變數在這幾個線程間是共用的。而繼承Thread的方式做不到,比如A extends Thread
,每次啟動線程都是new A().start()
,每次的A對象都不同。
3. 實現Callable介面
Callable
區別於Runnable
介面的點在於,Callable
的方法有返回值,還能拋出異常。
public interface Callable<V> {
V call() throws Exception;
}
Callable
的用法:
- 配合
FutureTask
一起使用,FutureTask
是RunnableFuture
介面的典型實現,RunnableFuture
介面從名字來看,它同時具有Runnable
和Future
介面的的能力。FutureTask
提供2個構造器,同時支持Callable
方式和Runnable
方式的任務。FutureTask
可作為任務傳給Thread
的構造器。 - 使用線程池時,調用
ExecutorService#submit
方法,返回一個Future
對象。 Future
對象的get()
方法能返回非同步執行的結果。調用get()
方法時,如果非同步任務已經完成,就直接返回結果。如果非同步任務還沒完成,那麼get()
方法會阻塞,一直等待任務完成才返回結果,這一點也是FutureTask
的缺點。
Callable
和FutureTask
一起使用的例子:
public class CallableTest {
public static void main(String[] args) {
// 創建Callable介面實現類的對象
CallableThread sumThread = new CallableThread();
// 創建FutureTask對象
FutureTask<Integer> futureTask = new FutureTask<>(sumThread);
// 將FutureTask的對象作為參數傳遞到Thread類的構造器中,創建Thread對象,並調用start()
new Thread(futureTask).start();
try {
// 獲取Callable中call方法的返回值
Integer sum = futureTask.get();
System.out.println("總和為" + sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("main線程結束");
}
}
class CallableThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
Thread.sleep(2000); // 等待2s驗證futureTask.get()是否等待
return sum;
}
}
總和為5050
main線程結束
在JDK源碼中可看到get()方法執行時,會判斷線程狀態如果是未完成,會進入一個無限迴圈,直到任務完成才返回執行結果。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如果未完成,則等待完成
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// ...
for (; ; ) { // 無線迴圈,直到任務完成
// ...
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// ...
}
}
Future和FutureTask
使用Callable
介面前,需要瞭解Future
和FutureTask
。
在Java併發編程中,Future
介面代表著非同步計算結果。它定義的方法有:
get()
:獲取結果,任務未完成前會一直等待,直到完成;get(long timeout, TimeUnit unit)
:獲取結果,但只等待指定的時間;添加超時時間可以讓調用線程及時釋放,不會死等;cancel(boolean mayInterruptIfRunning)
:取消當前任務;mayInterruptIfRunning
的作用是,當任務在執行中被取消,如果mayInterruptIfRunning == true
就中斷任務,否則不中斷,任務可繼續執行。isCancelled()
:任務在執行完成前被取消,返回true
,否則返回false
;isDone()
:判斷任務是否已完成。任務完成包括:正常完成、拋出異常而完成、任務被取消。
FutureTask
作為Future
的實現類,也有局限性。比如get()
方法會阻塞調用線程;不能將多個非同步計算結果合併到一起等等,針對這些局限,Java8提供了CompletableFuture
。
4.線程池
下麵我將圍繞這幾個問題,來討論一下線程池。
- 線程池是什麼?
- 為什麼使用線程池,或者說使用線程池的好處是什麼?
- 線程池怎麼使用?
- 線程池的原理是什麼,它怎麼做到重覆利用線程的?
線程池是什麼
線程池(Thread Pool)是一種基於池化思想的管理線程的工具,它內部維護了多個線程,目的是能重覆利用線程,控制併發量,降低線程創建及銷毀的資源消耗,提升程式穩定性。
為什麼使用線程池
使用線程池的好處:
- 降低資源消耗:重覆利用已創建的線程,降低線程創建和銷毀造成的損耗。
- 提高響應速度:任務到達時,無需等待線程創建即可立即執行。
- 提高線程的可管理性:線程是稀缺資源,如果無限制創建,不僅會消耗系統資源,還會因為線程的不合理分佈導致資源調度失衡,降低系統的穩定性。使用線程池可以進行統一的分配、調優和監控。
線程池解決的核心問題就是資源管理問題,在併發場景下,系統不能夠確定在任意時刻,有多少任務需要執行,有多少資源需要投入。這種不確定性將帶來以下若幹問題:
- 頻繁申請/銷毀資源和調度資源,將帶來額外的消耗,可能非常巨大。
- 對資源無限申請缺少抑制手段,易引發系統資源耗盡的風險。
- 系統無法合理管理內部的資源分佈,會降低系統的穩定性。
線程池這種基於池化思想的技術就是為瞭解決這類問題。
線程池怎麼使用
線程池的的核心實現類是ThreadPoolExecutor
,調用execute
或者submit
方法即可開啟一個子任務。
public class ThreadPoolTest {
private static ThreadPoolExecutor poolExecutor =
new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnableTask = () -> System.out.println("runnable task end");
poolExecutor.execute(runnableTask);
Callable<String> callableTask = () -> "callable task end";
Future<String> future = poolExecutor.submit(callableTask);
System.out.println(future.get());
}
}
ThreadPoolExecutor
的核心構造器有7個參數,我們來分析一下每個參數的含義:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 省略...
}
corePoolSize
:線程池的核心線程數。線程池中的線程數小於corePoolSize
時,直接創建新的線程來執行任務。workQueue
:阻塞隊列。當線程池中的線程數超過corePoolSize
,新任務會被放到隊列中,等待執行。maximumPoolSize
:線程池的最大線程數量。keepAliveTime
:非核心線程空閑時的存活時間。非核心線程即workQueue
滿了之後,再提交任務時創建的線程。非核心線程如果空閑了,超過keepAliveTime
後會被回收。unit
:keepAliveTime
的時間單位。threadFactory
:創建線程的工廠。預設的線程工廠會把提交的任務包裝成一個新的任務。handler
:拒絕策略。當線程池的workQueue
已滿且線程數達到最大線程數時,新提交的任務執行對應的拒絕策略。
JDK也提供了一個快速創建線程池的工具類Executors
,它提供了多種創建線程池的方法,但通常不建議使用Executors
來創建線程池,因為它提供的很多工具方法,要麼使用的阻塞隊列沒有設置邊界,要麼是沒有設置最大線程的上限。任務一多容易發生OOM。實際開發應該根據業務自定義線程池。
線程池的原理
execute
線程池的核心運行機制在於execute
方法,所有的任務調度都是通過execute
方法完成的。
public void execute(Runnable command) {
// ...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // (1)
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // (2)
int recheck = ctl.get();
// 重新檢查狀態,如果是非運行狀態,接著執行隊列刪除操作,然後執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果是因為remove(command)刪除隊列元素失敗,再判斷池中線程數量
// 如果池中線程數為0則新增一個任務為null的非核心線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // (3)
reject(command);
}
透過execute
方法的3個if
判斷,可以把它的邏輯梳理為3個部分:
- 第一個
if
:如果線程數量小於核心線程數,則創建一個線程來執行新提交的任務。 - 第二個
if
:如果線程數量大於等於核心線程數,則將任務添加到該阻塞隊列中。 else if
:線程池狀態不對,或者添加到隊列失敗即隊列滿了,則創建一個非核心線程執行新提交的任務。如果非核心線程創建失敗就執行拒絕策略。
addWorker
execute
中的核心邏輯要看addWoker
方法,它承擔了核心線程和非核心線程的創建。addWorker
方法前半部分代碼用一個雙重for迴圈確保線程池狀態正確,後半部分的邏輯是創建一個線程對象Worker
,開啟新線程執行任務的過程。
Worker
是對提交進來的線程的封裝,創建的worker
會被添加到一個HashSet
,線程池中的線程都維護在這個名為workers
的HashSet
中並被線程池所管理。
前面說到,Worker
本身也是一個線程對象,它實現了Runnable
介面,在addWorker
中會啟動一個新的任務,所以我們要看它的run
方法,而run
方法的核心邏輯是runWorker
方法。
final void runWorker(Worker w) {
// ...
try {
while (task != null || (task = getTask()) != null) {
// ...
try {
try {
task.run(); // 執行普通的run方法
} finally {
task = null; // task置空
}
}
}
} finally {
processWorkerExit(w, completedAbruptly); // 回收空閑線程
}
}
可以看到runWorker
方法中有一個while
迴圈,迴圈執行task的run方法,這裡的task就是提交到線程池的任務,它對當成了普通的對象,執行完task.run()
,最後會把task
設置為null
。
再看迴圈的條件,已知task是有可能為空的,所以我們再看看(task = getTask()) != null
這個條件,如果getTask() == null
則跳出迴圈執行processWorkerExit
方法,processWorkerExit
方法的作用是回收空閑線程。
getTask
很多答案都在getTask()
方法中。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) { // (1)
// 校驗線程池狀態的代碼,先省略...
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // (2)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // 線程數減1
return null; // 這裡時中斷外層while迴圈的時機
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // (3)
if (r != null)
return r; // 取到值了就在外層的while迴圈中執行任務
timedOut = true; // 否則就標記為獲取隊列任務超時
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
結合(1)、(3)這兩個地方可以看出,getTask()
方法是一個無限迴圈,不斷從阻塞隊列中取任務,取到了任務就返回,到外層runWorker
方法中,執行這個任務的run
方法,即線程池通過啟動一個Worker子線程來執行提交進來的任務,並且一個Worker線程會執行多個任務!
我們再看看getTask()
何時返回null
,因為返回null
才可以看下一步的processWorkerExit
方法。
getTask()
返回null
主要看timed && timedOut
這個條件。變數值timed
為true
的條件是:允許核心線程超時或者線程數大於核心線程數。timedOut
變數為true
的條件是從workQueue
為空了,取不到任務了,但是這個前提是timed == true
,執行workQueue.poll
的時候,因為workQueue.poll
方法獲取任務最多等待keepAliveTime
的時間,超過這個時間獲取不到就返回null
,而workQueue.take()
方法獲取不到任務會一直等待!
因此,在核心線程不會超時的情況下,如果池中的線程數小於核心線程數,這個getTask()會一直迴圈下去,這就是在這種情況下線程池不會自動關閉的原因!反之,在核心線程不會超時的情況下,如果池中的線程數超過核心線程數,才會對多餘的線程回收。如果allowCoreThreadTimeOut == true
,即核心線程也能超時,當阻塞隊列為空,所有Worker
線程都會被回收。
ThreadPoolExecutor
的註釋說,當池中沒有剩餘線程,線程池會自動關閉。
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically
但我也沒找到證據,沒看到哪裡顯式調用shutdown()
,但確實會自動關閉。
processWorkerExit
getTask()
獲取不到任務後,會執行processWorkerExit
方法回收線程。在這裡,Worker
線程集合隨機刪除一個線程對象,然後再隨機中斷一個workers
中的線程。可見線程銷毀線程的方式時刪除線程引用,讓JVM自動回收。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// ...
try {
workers.remove(w);
}
// 調用interrupt()方法中斷線程,一次中斷一個
tryTerminate();
// ...
}
線程池原理總結
最後我們回到最初的問題,線程池的原理是什麼,線程池怎麼做到重覆利用線程的?
線程池通過維護一組叫Worker
的線程對象來處理任務。線上程數不超過核心線程數的情況下,一個任務對應一個Worker
線程,超過核心線程數,新的任務會提交到阻塞隊列。一個Worker
線程在啟動後,除了執行第一次任務之外,還會不斷向阻塞隊列中消費任務。如果隊列里沒任務了,Worker
線程會一直輪詢,不會退出;只有在池中線程數超過核心線程數時才退出輪詢,然後回收多餘的空閑線程。即一個Worker
線程會處理多個任務,且Worker
線程受線程池管理,不會隨意回收。
線程池的拒絕策略
拒絕策略的目的是保護線程池,避免無節制新增任務。JDK使用RejectedExecutionHandler
介面代表拒絕策略,並提供了4個實現類。線程池的預設拒絕策略是AbortPolicy
,丟棄任務並拋出異常。實際開發中用戶可以通過實現這個介面去定製拒絕策略。
線程的狀態
New
:新創建的線程,尚未執行;Runnable
:運行中的線程,正在執行run()
方法的Java代碼;Blocked
:運行中的線程,因為某些操作被阻塞而掛起;Waiting
:運行中的線程,因為某些操作在等待中;Timed Waiting
:運行中的線程,因為執行sleep()
方法正在計時等待;Terminated
:線程已終止,因為run()
方法執行完畢。
當線程啟動後,它可以在Runnable
、Blocked
、Waiting
和Timed Waiting
這幾個狀態之間切換,直到最後變成Terminated
狀態,線程終止。
線程終止的原因有:
- 線程正常終止:
run()
方法執行到return
語句返回; - 線程意外終止:
run()
方法因為未捕獲的異常導致線程終止; - 對某個線程的
Thread
實例調用stop()
方法強制終止(過時方法,不推薦使用)。
Thread類的常用方法
start()
:啟動當前線程currentThread()
:返回當前代碼執行的線程yield()
: 釋放當前CPU的執行權join()
:join()
方法可以讓其他線程等待,直到自己執行完了,其他線程才繼續執行。setDaemon(boolean on)
:設置守護線程,也叫後臺線程。JVM退出時,不必關心守護線程是否已結束。interrupt()
:中斷線程。sleep(long millis)
:讓線程睡眠指定的毫秒數,在指定時間內,線程是阻塞狀態isAlive()
:判斷當前線程是否存活。
public class ThreadJoinTest {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
System.out.println("hello");
});
System.out.println("start");
t.start();
t.join();
System.out.println("end");
}
}
start
hello
end
volatile
線程間共用變數需要使用volatile
關鍵字標記,確保每個線程都能讀取到更新後的變數值。
為什麼要對線程間共用的變數用關鍵字volatile
聲明?這涉及到Java的記憶體模型(JMM)。
類變數、實例變數是共用變數,方法局部變數是私有變數。共用變數的值保存在主記憶體中,每個線程都有自己的工作記憶體,私有變數就保存在工作記憶體。
在Java虛擬機中,共用變數的值保存在主記憶體中,但是,當線程訪問變數時,它會先獲取一個副本,並保存在自己的工作記憶體中。如果線程修改了變數的值,虛擬機會在某個時刻把修改後的值回寫到主記憶體,但是,這個時間是不確定的!
這會導致如果一個線程更新了某個變數,另一個線程讀取的值可能還是更新前的。例如,主記憶體的變數a = true
,線程1執行a = false
時,它在此刻僅僅是把變數a
的副本變成了false
,主記憶體的變數a
還是true
,在JVM把修改後的a
回寫到主記憶體之前,其他線程讀取到的a
的值仍然是true
,這就造成了多線程之間共用的變數不一致。
因此,volatile
關鍵字的目的是告訴虛擬機:
- 每次訪問變數時,總是獲取主記憶體的最新值;
- 每次修改變數後,立刻回寫到主記憶體。
volatile
關鍵字解決的是可見性問題:當一個線程修改了某個共用變數的值,其他線程能夠立刻看到修改後的值。
但是volatile
不能保證原子性,原子性問題需要根據實際情況做同步處理。
線程同步
什麼叫線程同步?對於多線程的程式來說,同步指的是在一定的時間內只允許某一個線程訪問某個資源。
在Java中,最常見的方法是用synchronized
關鍵字實現同步效果。
synchronized
synchronized
可以修飾實例方法、靜態方法、代碼塊。
synchronized
的底層是使用操作系統的互斥鎖(mutex lock)實現的,它的特點是保證記憶體可見性、操作原子性。
- 記憶體可見性:可見性的原理還要回到Java記憶體模型(上面JMM的那張圖)。 synchronized上鎖時,會清空工作記憶體中變數的值,去主記憶體中獲取該變數的值;解鎖時,會把工作記憶體中變數的值同步回主記憶體中。
- 操作原子性:持有同一個鎖的兩個同步塊只能串列地執行。
使用synchronized
解決了多線程同步訪問共用變數的正確性問題。但是,它的缺點是帶來了性能下降。因為synchronized
代碼塊無法併發執行。此外,加鎖和解鎖需要消耗一定的時間,所以,synchronized
會降低程式的執行效率。
不需要synchronized的操作
JVM規範定義了幾種原子操作:
- 基本類型(
long
和double
除外)賦值,例如:int n = 1
; - 引用類型賦值,例如:
List list = anotherList
。
long
和double
是64位(8位元組)數據,在32位和64位操作系統上是不一樣的。JVM沒有明確規定64位賦值操作是不是一個原子操作,不過在x64平臺的JVM是把long
和double
的賦值作為原子操作實現的。
本文來自博客園,作者:xfcoding,轉載請註明原文鏈接:https://www.cnblogs.com/cloudrich/p/17407804.html