多線程筆記(四) 1. Atomic框架包 Atomic包里放著所以保證線程安全的原子類 大致分為7類 基本數據類型的原子操作類 引用類型的原子操作類 數組類型的原子操作類 修改屬性欄位的原子操作類 帶版本的引用類型的原子操作類 增強的基本數據類型的原子操作類 增強操作的公共輔助類 2. Count ...
多線程筆記(四)
1. Atomic框架包
Atomic包里放著所以保證線程安全的原子類
大致分為7類
- 基本數據類型的原子操作類
- 引用類型的原子操作類
- 數組類型的原子操作類
- 修改屬性欄位的原子操作類
- 帶版本的引用類型的原子操作類
- 增強的基本數據類型的原子操作類
- 增強操作的公共輔助類
2. CountDownLatch
CountDownLatch是一個輔助同步器類,是同步器框架里常用的一個類,主要作用是用於計數用的,類似於倒計時。
同步器框架:用來輔助實現同步功能的一些類。例如:CountDownLatch,CyclicBarrier,Semaphore,Exchanger
CountDownLatch的計數器減為0的時候是不能再次使用的,必須重新再new一個CountDownLatch。
CountDownLatch基於AQS實現,內部使用共用鎖,因為要允許多個線程同時運行。
代碼示例:
public class CountDownLatchTest {
//多線程,統計給定的內容裡面,字母a出現的次數,不區分大小寫
public static void main(String[] args) {
String[] contents = {
"sadasgadnqdhqamdjawqeqa",
"jodasjoqwehmkldasmdqaiwpmclz"
};
int[] results = new int[contents.length];
CountDownLatch cdLatch = new CountDownLatch(contents.length);
//啟動統計的計算線程
for(int i = 0; i < contents.length; i++){
new CountDownWorker(contents,i , results, cdLatch).start();
}
//等待所有的統計的計算線程運行結束過後,才計算最後的統計結果
try {
cdLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//計算最後的統計結果
new CountDownWorkerSum(results).start();
}
}
class CountDownWorkerSum extends Thread{
private int[] results = null;
public CountDownWorkerSum(int[] results){
this.results = results;
}
public void run(){
int sum = 0;
for(int i = 0; i < results.length; i++){
sum += results[i];
}
System.out.println("a總共出現的次數為 " + sum);
}
}
class CountDownWorker extends Thread{
//需要統計的內容
private String[] contents = null;
//本線程需要統計的索引
private int index = 0;
//用引用傳遞的方式,返回統計的結構數據
private int[] results;
private CountDownLatch cdLatch;
public CountDownWorker(String[] contents, int index, int[] results, CountDownLatch cdLatch){
this.contents = contents;
this.index = index;
this.results = results;
this.cdLatch = cdLatch;
}
public void run(){
System.out.println("Thread=" + Thread.currentThread().getName());
String str = contents[index];
str = str.trim();
int count = 0;
for(int i = 1; i <= str.length(); i++){
if("a".equalsIgnoreCase(str.substring(i - 1, i))){
count++;
}
}
System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個");
results[index] = count;
//計數
cdLatch.countDown();
}
}
3. CyclicBarrier
CyclicBarrier,迴圈柵欄,也可稱為迴圈屏障,是一個輔助同步器類,功能和CountDownLatch類似。
使用Condition條件隊列,在沒有滿足要求之前讓線程進行等待。
代碼示例:
public class CyclicBarrierTest {
//多線程,統計給定的內容裡面,字母a出現的次數,不區分大小寫
public static void main(String[] args) {
String[] contents = {
"sadasgadnqdhqamdjawqeqa",
"jodasjoqwehmkldasmdqaiwpmclz"
};
int[] results = new int[contents.length];
//運行完的線程達到了指定數量之後,自動執行CyclicBarrierSum(results)
CyclicBarrier cb = new CyclicBarrier(contents.length, new CyclicBarrierSum(results) );
//啟動統計的計算線程
for(int i = 0; i < contents.length; i++){
new CyclicBarrierWorker(contents,i , results, cb).start();
}
}
}
class CyclicBarrierSum extends Thread{
private int[] results = null;
public CyclicBarrierSum(int[] results){
this.results = results;
}
public void run(){
int sum = 0;
for(int i = 0; i < results.length; i++){
sum += results[i];
}
System.out.println("a總共出現的次數為 " + sum);
}
}
class CyclicBarrierWorker extends Thread{
//需要統計的內容
private String[] contents = null;
//本線程需要統計的索引
private int index = 0;
//用引用傳遞的方式,返回統計的結構數據
private int[] results;
private CyclicBarrier cb;
public CyclicBarrierWorker(String[] contents, int index, int[] results, CyclicBarrier cb){
this.contents = contents;
this.index = index;
this.results = results;
this.cb = cb;
}
public void run(){
System.out.println("Thread=" + Thread.currentThread().getName());
String str = contents[index];
str = str.trim();
int count = 0;
for(int i = 1; i <= str.length(); i++){
if("a".equalsIgnoreCase(str.substring(i - 1, i))){
count++;
}
}
System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個");
results[index] = count;
//到達柵欄,等待
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
4. Semaphore
Semaphore的意思就是信號量,也是一個輔助同步器類
Semaphore支持公平鎖與非公平鎖
Semaphore維護一定數量的“許可證”。當有線程想要訪問共用資源的時候,首先就要去獲得許可,如果許可證不夠了,線程就一直等待,直到獲得許可證。
線程使用完共用資源過後,應該歸還“許可證”,以供其他的線程來獲得許可證。當線程使用完共用資源過後,應該歸還許可證,以供其他的線程來獲得許可證。
代碼示例:
public class SemaphoreTest {
//模擬前面走了一個人,後一個人才能進入地鐵站
public static void main(String[] args) {
//假設地鐵站容量為3,但是要來10倍容量的人
int maxNum = 3;
Semaphore sp = new Semaphore(maxNum);
for(int i = 0; i < maxNum * 10; i++){
new WeAreProgrammer(sp).start();
}
}
}
class WeAreProgrammer extends Thread{
private Semaphore sp = null;
public WeAreProgrammer(Semaphore sp){
this.sp = sp;
}
public void run(){
System.out.println("到達地鐵口,請求獲取進入地鐵站的許可,線程= "+ Thread.currentThread().getName());
//請求得到許可
try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("獲取到許可,終於可以進入到地鐵站了,開始等地鐵,線程= "+ Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("上車了,真開心,釋放許可,線程= "+ Thread.currentThread().getName());
//釋放許可
sp.release();
}
}
5. Exchanger
Exchanger是交換器的意思,也是一個輔助同步器類。
Exchanger可用於兩個線程間交換數據(單槽位交換數據)。也支持多個線程間交換數據(多槽位交換數據),但是多少線程交換數據無法保證把某一線程的數據與另一指定線程交換。
代碼示例:
public class ExchangerTest {
//實現兩個線程之間交換數據
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new ExchangerA(exchanger).start();
new ExchangerB(exchanger).start();
}
}
class ExchangerA extends Thread{
//假定交換字元串類型的數據,所有泛型為String
private Exchanger<String> exchanger = null;
public ExchangerA(Exchanger<String> exchanger){
this.exchanger = exchanger;
}
public void run(){
System.out.println("ExchangerA在做之間的業務,產生了數據,線程= "+ Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerA等待與ExchangerB交換數據");
//交換數據
try {
String exchangeMsg = exchanger.exchange("《這是ExchangerA的信息》");
System.out.println("獲得ExchangerB交換過來的數據=" + exchangeMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerA接著執行自己的業務");
}
}
class ExchangerB extends Thread{
//假定交換字元串類型的數據,所有泛型為String
private Exchanger<String> exchanger = null;
public ExchangerB(Exchanger<String> exchanger){
this.exchanger = exchanger;
}
public void run(){
System.out.println("ExchangerB在做之間的業務,產生了數據,線程= "+ Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerB等待與ExchangerB交換數據");
//交換數據
try {
String exchangeMsg = exchanger.exchange("《這是ExchangerB的信息》");
System.out.println("獲得ExchangerA交換過來的數據=" + exchangeMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerB接著執行自己的業務");
}
}
6. Executors框架
Executors框架是用來對多個線程,按照一定的策略,進行調度,執行和控制的框架。主要用於對線程池的管理
Executor介面
該介面內只有一個方法void execute(Runnable command);
用來在任務創建和任務執行之間進行解耦,用來提交可執行任務
ExecutorService介面
Executor的增強介面,繼承了Executor介面,在Executor的基礎上,增加了幾類實用的方法。提供對線程池聲明周期的管理,增加了對非同步任務,批處理任務的支持。
-
關閉執行器:
void shutdown();
:在調用該方法的時候,已經提交給執行器的任務會繼續執行,不會接受新任務的提交List<Runnable> shutdownNow();
:不會接受新任務的提交,也會嘗試中斷現在正在執行的任務(需要任務支持響應中斷)
-
監視執行器的狀態:
boolean isShutdown();
:如果執行器關閉了,返回trueboolean isTerminated();
:如果執行器終止(執行器關閉,並且所有的任務都已經完成)了,返回true
-
提供了對非同步任務的支撐:
<T> Future<T> submit(Callable<T> task);
:提交有返回值的任務<T> Future<T> submit(Runnable task, T result);
:提交任務,result指向返回值,可在外部通過get獲得Future<?> submit(Runnable task);
:提交沒有返回值的任務
-
提供了對批處理任務的支持:
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
:執行任務集合,返回Future集合<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:執行任務集合,返回Future集合,加入了超時限制<T> T invokeAny(Collection<? extends Callable<T>> tasks)
:隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務。<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務,加入了超時限制。
ScheduledExecutorService介面
繼承了ExecutorService介面,用來定時執行或者周期性執行任務。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
:提交一個任務,延時一段時間執行<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
:提交一個任務,延時一段時間執行ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
:提交一個任務,時間等於或超過time首次執行任務,之後每隔period*unit重覆執行任務。ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
:創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
Executors
Executors用於輔助創建Executor介面的實現類的實例
Executors是一個簡單工廠,大體包括
-
創建固定線程數的線程池
-
ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
-
創建可緩存的線程池
-
ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
-
創建可延時,可周期執行的線程池
-
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
-
-
創建單個線程數的線程池
-
ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
-
-
創建Fork/Join框架
-
ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
-
ThreadPoolExecutor
ThreadPoolExecutor繼承AbstractExecutorService類,而AbstractExecutorService又實現了ExecutorService介面
為什麼引入線程池:
- 減少因為頻繁創建和銷毀檢查所帶來的開銷
- 提高程式的響應速度
- 自動管理線程,調度任務的執行,使得調用者只用關註任務的創建
構造方法參數解釋
線程池的構造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
參數
corePoolSize:核心線程池數量
maximumPoolSize:線程池最大線程數量(核心線程池數量+非核心線程池數量)
keepAliveTime:非核心線程存活時間
unit:存活時間的單位
workQueue:任務隊列,當線程池的線程滿了之後,任務會進入任務隊列等待提交
handler:拒絕策略
拒絕策略
有兩種情況會執行拒絕策略:
- 核心線程池滿了,任務隊列也滿了,同時非核心線程池也滿了,這個時候再提交新的任務就會拒絕。
- 提交任務的時候,線程池關閉了。
一共有四種拒絕策略:
- AbortPolicy:拋出一個異常
- DiscardPolicy:什麼都不做,等任務自己被回收
- DiscardOldestPolicy:丟棄隊列中最先入隊的一個任務,並執行當前任務
- CallerRunsPolicy:用調用者的線程來執行任務
線程池狀態
用AtomicInteger類型的變數ctl來表示線程池的狀態,高3位保存線程池的狀態,低29位保存線程的數量
一共五種狀態:
-
RUNNING(-1):接受新任務,而且會處理已經進入阻塞隊列的任務
-
SHUTDOWN(0):不接受新任務,但會處理已經進入阻塞隊列的任務
-
STOP(1):不再接受新的任務,而且不再處理已經進入阻塞隊列的任務,同時會中斷正在運行的任務
-
TIDYING(2):所有任務都已經終止,工作線程數量為0時,準備調用terminated方法
-
TERMINATED(3):terminated方法已經執行完成
線程池狀態變化示意圖:
7. Future介面
Future模式是多線程中一種常見的設計模式,用來實現非同步執行任務。調用方拿到的是憑證,拿到憑證之後就可以去處理其他的任務,在需要使用結果的時候再使用憑證還獲取調用結果。Future介面的具體實現類是FutureTask
FutureTask
狀態:
- NEW(0):表示任務的初始化狀態
- COMPLETING(1):表示任務已經執行完成,屬於中間狀態
- NORMAL(2):表示任務正常完成,屬於最終狀態
- EXCEPTIONAL(3):表示任務異常完成,屬於最終狀態
- CANCELLED(4):表示任務還沒有開始執行就被取消了(非中斷方式),屬於最終狀態
- INTERRUPTING(5):表示任務還沒有開始執行就被取消了(中斷方式),屬於中間狀態
- INTERRUPTED(6):表示任務還沒有開始執行就被取消了(中斷方式),已經被中斷,屬於最終狀態
狀態變化示意圖