多線程筆記(四)

来源:https://www.cnblogs.com/xuzhuo123/archive/2022/05/16/16277040.html
-Advertisement-
Play Games

多線程筆記(四) 1. Atomic框架包 Atomic包里放著所以保證線程安全的原子類 大致分為7類 基本數據類型的原子操作類 引用類型的原子操作類 數組類型的原子操作類 修改屬性欄位的原子操作類 帶版本的引用類型的原子操作類 增強的基本數據類型的原子操作類 增強操作的公共輔助類 2. Count ...


多線程筆記(四)

1. Atomic框架包

Atomic包里放著所以保證線程安全的原子類

大致分為7類

  1. 基本數據類型的原子操作類
  2. 引用類型的原子操作類
  3. 數組類型的原子操作類
  4. 修改屬性欄位的原子操作類
  5. 帶版本的引用類型的原子操作類
  6. 增強的基本數據類型的原子操作類
  7. 增強操作的公共輔助類

2. CountDownLatch

CountDownLatch是一個輔助同步器類,是同步器框架里常用的一個類,主要作用是用於計數用的,類似於倒計時。

同步器框架:用來輔助實現同步功能的一些類。例如:CountDownLatch,CyclicBarrier,Semaphore,Exchanger

CountDownLatch的計數器減為0的時候是不能再次使用的,必須重新再new一個CountDownLatch。

CountDownLatch基於AQS實現,內部使用共用鎖,因為要允許多個線程同時運行。

代碼示例:

public class CountDownLatchTest {
    //多線程,統計給定的內容裡面,字母a出現的次數,不區分大小寫
    public static void main(String[] args) {
        String[] contents = {
                "sadasgadnqdhqamdjawqeqa",
                "jodasjoqwehmkldasmdqaiwpmclz"
        };
        int[] results = new int[contents.length];
        CountDownLatch cdLatch = new CountDownLatch(contents.length);

        //啟動統計的計算線程
        for(int i = 0; i < contents.length; i++){
            new CountDownWorker(contents,i , results, cdLatch).start();
        }
        //等待所有的統計的計算線程運行結束過後,才計算最後的統計結果
        try {
            cdLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //計算最後的統計結果
        new CountDownWorkerSum(results).start();
    }
}
class CountDownWorkerSum extends Thread{
    private int[] results = null;
    public CountDownWorkerSum(int[] results){
        this.results = results;
    }
    public void run(){
        int sum = 0;
        for(int i = 0; i < results.length; i++){
            sum += results[i];
        }
        System.out.println("a總共出現的次數為 " + sum);
    }
}

class CountDownWorker extends Thread{
    //需要統計的內容
    private String[] contents = null;
    //本線程需要統計的索引
    private int index = 0;
    //用引用傳遞的方式,返回統計的結構數據
    private int[] results;
    private CountDownLatch cdLatch;
    public CountDownWorker(String[] contents, int index, int[] results, CountDownLatch cdLatch){
        this.contents = contents;
        this.index = index;
        this.results = results;
        this.cdLatch = cdLatch;
    }
    public void run(){
        System.out.println("Thread=" + Thread.currentThread().getName());
        String str = contents[index];
        str = str.trim();

        int count = 0;
        for(int i = 1; i <= str.length(); i++){
            if("a".equalsIgnoreCase(str.substring(i - 1, i))){
                count++;
            }
        }
        System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個");
        results[index] = count;
        //計數
        cdLatch.countDown();
    }
}

3. CyclicBarrier

CyclicBarrier,迴圈柵欄,也可稱為迴圈屏障,是一個輔助同步器類,功能和CountDownLatch類似。

使用Condition條件隊列,在沒有滿足要求之前讓線程進行等待。

代碼示例:

public class CyclicBarrierTest {
    //多線程,統計給定的內容裡面,字母a出現的次數,不區分大小寫
    public static void main(String[] args) {
        String[] contents = {
                "sadasgadnqdhqamdjawqeqa",
                "jodasjoqwehmkldasmdqaiwpmclz"
        };
        int[] results = new int[contents.length];
        //運行完的線程達到了指定數量之後,自動執行CyclicBarrierSum(results)
        CyclicBarrier cb = new CyclicBarrier(contents.length, new CyclicBarrierSum(results) );

        //啟動統計的計算線程
        for(int i = 0; i < contents.length; i++){
            new CyclicBarrierWorker(contents,i , results, cb).start();
        }
    }
}

class CyclicBarrierSum extends Thread{
    private int[] results = null;
    public CyclicBarrierSum(int[] results){
        this.results = results;
    }
    public void run(){
        int sum = 0;
        for(int i = 0; i < results.length; i++){
            sum += results[i];
        }
        System.out.println("a總共出現的次數為 " + sum);
    }
}

class CyclicBarrierWorker extends Thread{
    //需要統計的內容
    private String[] contents = null;
    //本線程需要統計的索引
    private int index = 0;
    //用引用傳遞的方式,返回統計的結構數據
    private int[] results;
    private CyclicBarrier cb;
    public CyclicBarrierWorker(String[] contents, int index, int[] results, CyclicBarrier cb){
        this.contents = contents;
        this.index = index;
        this.results = results;
        this.cb = cb;
    }
    public void run(){
        System.out.println("Thread=" + Thread.currentThread().getName());
        String str = contents[index];
        str = str.trim();

        int count = 0;
        for(int i = 1; i <= str.length(); i++){
            if("a".equalsIgnoreCase(str.substring(i - 1, i))){
                count++;
            }
        }
        System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個");
        results[index] = count;

        //到達柵欄,等待
        try {
            cb.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

4. Semaphore

Semaphore的意思就是信號量,也是一個輔助同步器類

Semaphore支持公平鎖與非公平鎖

Semaphore維護一定數量的“許可證”。當有線程想要訪問共用資源的時候,首先就要去獲得許可,如果許可證不夠了,線程就一直等待,直到獲得許可證。

線程使用完共用資源過後,應該歸還“許可證”,以供其他的線程來獲得許可證。當線程使用完共用資源過後,應該歸還許可證,以供其他的線程來獲得許可證。

代碼示例:

public class SemaphoreTest {
    //模擬前面走了一個人,後一個人才能進入地鐵站
    public static void main(String[] args) {
        //假設地鐵站容量為3,但是要來10倍容量的人
        int maxNum = 3;
        Semaphore sp = new Semaphore(maxNum);

        for(int i = 0; i < maxNum * 10; i++){
            new WeAreProgrammer(sp).start();
        }
    }

}

class WeAreProgrammer extends Thread{
    private Semaphore sp = null;
    public WeAreProgrammer(Semaphore sp){
        this.sp = sp;
    }
    public void run(){
        System.out.println("到達地鐵口,請求獲取進入地鐵站的許可,線程= "+ Thread.currentThread().getName());
        //請求得到許可
        try {
            sp.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("獲取到許可,終於可以進入到地鐵站了,開始等地鐵,線程= "+ Thread.currentThread().getName());
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("上車了,真開心,釋放許可,線程= "+ Thread.currentThread().getName());
        //釋放許可
        sp.release();
    }
}

5. Exchanger

Exchanger是交換器的意思,也是一個輔助同步器類。

Exchanger可用於兩個線程間交換數據(單槽位交換數據)。也支持多個線程間交換數據(多槽位交換數據),但是多少線程交換數據無法保證把某一線程的數據與另一指定線程交換。

代碼示例

public class ExchangerTest {
    //實現兩個線程之間交換數據
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        new ExchangerA(exchanger).start();
        new ExchangerB(exchanger).start();
    }
}

class ExchangerA extends Thread{
    //假定交換字元串類型的數據,所有泛型為String
    private Exchanger<String> exchanger = null;
    public ExchangerA(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    public void run(){
        System.out.println("ExchangerA在做之間的業務,產生了數據,線程= "+ Thread.currentThread().getName());
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerA等待與ExchangerB交換數據");
        //交換數據
        try {
            String exchangeMsg = exchanger.exchange("《這是ExchangerA的信息》");
            System.out.println("獲得ExchangerB交換過來的數據=" + exchangeMsg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerA接著執行自己的業務");
    }
}

class ExchangerB extends Thread{
    //假定交換字元串類型的數據,所有泛型為String
    private Exchanger<String> exchanger = null;
    public ExchangerB(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    public void run(){
        System.out.println("ExchangerB在做之間的業務,產生了數據,線程= "+ Thread.currentThread().getName());
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerB等待與ExchangerB交換數據");
        //交換數據
        try {
            String exchangeMsg = exchanger.exchange("《這是ExchangerB的信息》");
            System.out.println("獲得ExchangerA交換過來的數據=" + exchangeMsg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerB接著執行自己的業務");
    }
}

6. Executors框架

Executors框架是用來對多個線程,按照一定的策略,進行調度,執行和控制的框架。主要用於對線程池的管理

Executor介面

該介面內只有一個方法void execute(Runnable command);

用來在任務創建和任務執行之間進行解耦,用來提交可執行任務

ExecutorService介面

Executor的增強介面,繼承了Executor介面,在Executor的基礎上,增加了幾類實用的方法。提供對線程池聲明周期的管理,增加了對非同步任務,批處理任務的支持。

  • 關閉執行器:

    • void shutdown();:在調用該方法的時候,已經提交給執行器的任務會繼續執行,不會接受新任務的提交
    • List<Runnable> shutdownNow();:不會接受新任務的提交,也會嘗試中斷現在正在執行的任務(需要任務支持響應中斷)
  • 監視執行器的狀態:

    • boolean isShutdown();:如果執行器關閉了,返回true
    • boolean isTerminated();:如果執行器終止(執行器關閉,並且所有的任務都已經完成)了,返回true
  • 提供了對非同步任務的支撐:

    • <T> Future<T> submit(Callable<T> task);:提交有返回值的任務
    • <T> Future<T> submit(Runnable task, T result);:提交任務,result指向返回值,可在外部通過get獲得
    • Future<?> submit(Runnable task);:提交沒有返回值的任務
  • 提供了對批處理任務的支持:

    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):執行任務集合,返回Future集合
    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):執行任務集合,返回Future集合,加入了超時限制
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks):隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務。
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務,加入了超時限制。

ScheduledExecutorService介面

繼承了ExecutorService介面,用來定時執行或者周期性執行任務。

  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);:提交一個任務,延時一段時間執行
  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);:提交一個任務,延時一段時間執行
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);:提交一個任務,時間等於或超過time首次執行任務,之後每隔period*unit重覆執行任務。
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);:創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

Executors

Executors用於輔助創建Executor介面的實現類的實例

Executors是一個簡單工廠,大體包括

  • 創建固定線程數的線程池

    • ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
      
  • 創建可緩存的線程池

    • ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
      
  • 創建可延時,可周期執行的線程池

    • ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
          return new ScheduledThreadPoolExecutor(corePoolSize);
      }
      
  • 創建單個線程數的線程池

    • ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
      
  • 創建Fork/Join框架

    • ExecutorService newWorkStealingPool() {
          return new ForkJoinPool
              (Runtime.getRuntime().availableProcessors(),
               ForkJoinPool.defaultForkJoinWorkerThreadFactory,
               null, true);
      }
      

ThreadPoolExecutor

ThreadPoolExecutor繼承AbstractExecutorService類,而AbstractExecutorService又實現了ExecutorService介面

為什麼引入線程池:

  • 減少因為頻繁創建和銷毀檢查所帶來的開銷
  • 提高程式的響應速度
  • 自動管理線程,調度任務的執行,使得調用者只用關註任務的創建

構造方法參數解釋

線程池的構造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

參數

corePoolSize:核心線程池數量

maximumPoolSize:線程池最大線程數量(核心線程池數量+非核心線程池數量)

keepAliveTime:非核心線程存活時間

unit:存活時間的單位

workQueue:任務隊列,當線程池的線程滿了之後,任務會進入任務隊列等待提交

handler:拒絕策略

拒絕策略

有兩種情況會執行拒絕策略:

  • 核心線程池滿了,任務隊列也滿了,同時非核心線程池也滿了,這個時候再提交新的任務就會拒絕。
  • 提交任務的時候,線程池關閉了。

一共有四種拒絕策略:

  • AbortPolicy:拋出一個異常
  • DiscardPolicy:什麼都不做,等任務自己被回收
  • DiscardOldestPolicy:丟棄隊列中最先入隊的一個任務,並執行當前任務
  • CallerRunsPolicy:用調用者的線程來執行任務

線程池狀態

用AtomicInteger類型的變數ctl來表示線程池的狀態,高3位保存線程池的狀態,低29位保存線程的數量

一共五種狀態:

  • RUNNING(-1):接受新任務,而且會處理已經進入阻塞隊列的任務

  • SHUTDOWN(0):不接受新任務,但會處理已經進入阻塞隊列的任務

  • STOP(1):不再接受新的任務,而且不再處理已經進入阻塞隊列的任務,同時會中斷正在運行的任務

  • TIDYING(2):所有任務都已經終止,工作線程數量為0時,準備調用terminated方法

  • TERMINATED(3):terminated方法已經執行完成

線程池狀態變化示意圖:

7. Future介面

Future模式是多線程中一種常見的設計模式,用來實現非同步執行任務。調用方拿到的是憑證,拿到憑證之後就可以去處理其他的任務,在需要使用結果的時候再使用憑證還獲取調用結果。Future介面的具體實現類是FutureTask

FutureTask

狀態:

  • NEW(0):表示任務的初始化狀態
  • COMPLETING(1):表示任務已經執行完成,屬於中間狀態
  • NORMAL(2):表示任務正常完成,屬於最終狀態
  • EXCEPTIONAL(3):表示任務異常完成,屬於最終狀態
  • CANCELLED(4):表示任務還沒有開始執行就被取消了(非中斷方式),屬於最終狀態
  • INTERRUPTING(5):表示任務還沒有開始執行就被取消了(中斷方式),屬於中間狀態
  • INTERRUPTED(6):表示任務還沒有開始執行就被取消了(中斷方式),已經被中斷,屬於最終狀態

狀態變化示意圖


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在跟著尚矽谷的視頻學習Vue腳手架時,發現main.js文件中的格式同視頻中的完全不一樣,內容倒是可以看懂,但因為初學的緣故,還是照貓畫虎的操作一遍為好,但是在編寫render的時候始終不行,百度半天,沒有找到解決方法,然後思考一下,既然預設實例模版不一樣,那麼,是不是在創建工程時創建成Vue3版本 ...
  • Servlet就是一個介面我們需要寫一個類然後去實現Servlet,就可以被伺服器識別到。request是用來接受客戶端傳過來的參數,respone是用來響應客戶端的頁面。我們所用的容器是一個繼承的java容器tomcat。 ...
  • 前言 最近我小表妹迷上了玩連連看,玩了一個星期了還沒通關,真的是菜。 我實在是看不過去了,直接用python寫了個腳本代碼,一分鐘一把游戲。 快是快,就是聯網玩容易被罵,嘿嘿~ 但是,又不是我玩,有什麼關係呢~ 哈哈哈 😎 代碼 導入所需模塊 # -*- coding:utf-8 -*- impo ...
  • sort包簡介 官方文檔 Golang的sort包用來排序,二分查找等操作。本文主要介紹sort包里常用的函數,通過實例代碼來快速學會使用sort包 sort包內置函數 sort.Ints(x []int) ints := []int{1, 4, 3, 2} fmt.Printf("%v\n", i ...
  • 本文mybatis-spring-boot探討在springboot工程中mybatis相關對象的註冊與載入。 建議先瞭解mybatis在spring中的使用和springboot自動裝載機制,再看此文章。 傳送門:Mybatis源碼解讀-配置載入和Mapper的生成 問題 @MapperScan和 ...
  • 一個工作了7年的粉絲,遇到了一個Zookeeper的問題。 因為接觸過Zookeeper這個技術,不知道該怎麼回答。 我說一個工作了7年的程式員,沒有接觸過主流技術,這不正常。 於是我問了他工資以後,我理解了! 好吧,關於“Zookeeper中Watch機制的實現原理”,看看普通人和高手的回答。 普 ...
  • 作者:何甜甜在嗎 鏈接:https://juejin.cn/post/6916150628955717646 寫在前面 在介紹具體方案之前,首先先介紹一下常見的加密演算法。加密演算法可以分為三大類: 對稱加密演算法 非對稱加密演算法 Hash演算法 對稱加密演算法 加密和解密使用相同的密鑰。對稱加密演算法加密解密 ...
  • RocketMQ學習 1.基本概念 RocketMQ是阿裡巴巴團隊使用java語言開發的一款分散式消息中間件,是一款低延遲,高可用,擁有海量消息堆積能力和靈活拓展性的消息隊列。 rocketmq的官網:http://rocketmq.apache.org gitee倉庫:https://gitee. ...
一周排行
    -Advertisement-
    Play Games
  • .Net8.0 Blazor Hybird 桌面端 (WPF/Winform) 實測可以完整運行在 win7sp1/win10/win11. 如果用其他工具打包,還可以運行在mac/linux下, 傳送門BlazorHybrid 發佈為無依賴包方式 安裝 WebView2Runtime 1.57 M ...
  • 目錄前言PostgreSql安裝測試額外Nuget安裝Person.cs模擬運行Navicate連postgresql解決方案Garnet為什麼要選擇Garnet而不是RedisRedis不再開源Windows版的Redis是由微軟維護的Windows Redis版本老舊,後續可能不再更新Garne ...
  • C#TMS系統代碼-聯表報表學習 領導被裁了之後很快就有人上任了,幾乎是無縫銜接,很難讓我不想到這早就決定好了。我的職責沒有任何變化。感受下來這個系統封裝程度很高,我只要會調用方法就行。這個系統交付之後不會有太多問題,更多應該是做小需求,有大的開發任務應該也是第二期的事,嗯?怎麼感覺我變成運維了?而 ...
  • 我在隨筆《EAV模型(實體-屬性-值)的設計和低代碼的處理方案(1)》中介紹了一些基本的EAV模型設計知識和基於Winform場景下低代碼(或者說無代碼)的一些實現思路,在本篇隨筆中,我們來分析一下這種針對通用業務,且只需定義就能構建業務模塊存儲和界面的解決方案,其中的數據查詢處理的操作。 ...
  • 對某個遠程伺服器啟用和設置NTP服務(Windows系統) 打開註冊表 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer 將 Enabled 的值設置為 1,這將啟用NTP伺服器功 ...
  • title: Django信號與擴展:深入理解與實踐 date: 2024/5/15 22:40:52 updated: 2024/5/15 22:40:52 categories: 後端開發 tags: Django 信號 松耦合 觀察者 擴展 安全 性能 第一部分:Django信號基礎 Djan ...
  • 使用xadmin2遇到的問題&解決 環境配置: 使用的模塊版本: 關聯的包 Django 3.2.15 mysqlclient 2.2.4 xadmin 2.0.1 django-crispy-forms >= 1.6.0 django-import-export >= 0.5.1 django-r ...
  • 今天我打算整點兒不一樣的內容,通過之前學習的TransformerMap和LazyMap鏈,想搞點不一樣的,所以我關註了另外一條鏈DefaultedMap鏈,主要調用鏈為: 調用鏈詳細描述: ObjectInputStream.readObject() DefaultedMap.readObject ...
  • 後端應用級開發者該如何擁抱 AI GC?就是在這樣的一個大的浪潮下,我們的傳統的應用級開發者。我們該如何選擇職業或者是如何去快速轉型,跟上這樣的一個行業的一個浪潮? 0 AI金字塔模型 越往上它的整個難度就是職業機會也好,或者說是整個的這個運作也好,它的難度會越大,然後越往下機會就會越多,所以這是一 ...
  • @Autowired是Spring框架提供的註解,@Resource是Java EE 5規範提供的註解。 @Autowired預設按照類型自動裝配,而@Resource預設按照名稱自動裝配。 @Autowired支持@Qualifier註解來指定裝配哪一個具有相同類型的bean,而@Resourc... ...