多線程筆記(四)

来源:https://www.cnblogs.com/xuzhuo123/archive/2022/05/16/16277040.html
-Advertisement-
Play Games

多線程筆記(四) 1. Atomic框架包 Atomic包里放著所以保證線程安全的原子類 大致分為7類 基本數據類型的原子操作類 引用類型的原子操作類 數組類型的原子操作類 修改屬性欄位的原子操作類 帶版本的引用類型的原子操作類 增強的基本數據類型的原子操作類 增強操作的公共輔助類 2. Count ...


多線程筆記(四)

1. Atomic框架包

Atomic包里放著所以保證線程安全的原子類

大致分為7類

  1. 基本數據類型的原子操作類
  2. 引用類型的原子操作類
  3. 數組類型的原子操作類
  4. 修改屬性欄位的原子操作類
  5. 帶版本的引用類型的原子操作類
  6. 增強的基本數據類型的原子操作類
  7. 增強操作的公共輔助類

2. CountDownLatch

CountDownLatch是一個輔助同步器類,是同步器框架里常用的一個類,主要作用是用於計數用的,類似於倒計時。

同步器框架:用來輔助實現同步功能的一些類。例如:CountDownLatch,CyclicBarrier,Semaphore,Exchanger

CountDownLatch的計數器減為0的時候是不能再次使用的,必須重新再new一個CountDownLatch。

CountDownLatch基於AQS實現,內部使用共用鎖,因為要允許多個線程同時運行。

代碼示例:

public class CountDownLatchTest {
    //多線程,統計給定的內容裡面,字母a出現的次數,不區分大小寫
    public static void main(String[] args) {
        String[] contents = {
                "sadasgadnqdhqamdjawqeqa",
                "jodasjoqwehmkldasmdqaiwpmclz"
        };
        int[] results = new int[contents.length];
        CountDownLatch cdLatch = new CountDownLatch(contents.length);

        //啟動統計的計算線程
        for(int i = 0; i < contents.length; i++){
            new CountDownWorker(contents,i , results, cdLatch).start();
        }
        //等待所有的統計的計算線程運行結束過後,才計算最後的統計結果
        try {
            cdLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //計算最後的統計結果
        new CountDownWorkerSum(results).start();
    }
}
class CountDownWorkerSum extends Thread{
    private int[] results = null;
    public CountDownWorkerSum(int[] results){
        this.results = results;
    }
    public void run(){
        int sum = 0;
        for(int i = 0; i < results.length; i++){
            sum += results[i];
        }
        System.out.println("a總共出現的次數為 " + sum);
    }
}

class CountDownWorker extends Thread{
    //需要統計的內容
    private String[] contents = null;
    //本線程需要統計的索引
    private int index = 0;
    //用引用傳遞的方式,返回統計的結構數據
    private int[] results;
    private CountDownLatch cdLatch;
    public CountDownWorker(String[] contents, int index, int[] results, CountDownLatch cdLatch){
        this.contents = contents;
        this.index = index;
        this.results = results;
        this.cdLatch = cdLatch;
    }
    public void run(){
        System.out.println("Thread=" + Thread.currentThread().getName());
        String str = contents[index];
        str = str.trim();

        int count = 0;
        for(int i = 1; i <= str.length(); i++){
            if("a".equalsIgnoreCase(str.substring(i - 1, i))){
                count++;
            }
        }
        System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個");
        results[index] = count;
        //計數
        cdLatch.countDown();
    }
}

3. CyclicBarrier

CyclicBarrier,迴圈柵欄,也可稱為迴圈屏障,是一個輔助同步器類,功能和CountDownLatch類似。

使用Condition條件隊列,在沒有滿足要求之前讓線程進行等待。

代碼示例:

public class CyclicBarrierTest {
    //多線程,統計給定的內容裡面,字母a出現的次數,不區分大小寫
    public static void main(String[] args) {
        String[] contents = {
                "sadasgadnqdhqamdjawqeqa",
                "jodasjoqwehmkldasmdqaiwpmclz"
        };
        int[] results = new int[contents.length];
        //運行完的線程達到了指定數量之後,自動執行CyclicBarrierSum(results)
        CyclicBarrier cb = new CyclicBarrier(contents.length, new CyclicBarrierSum(results) );

        //啟動統計的計算線程
        for(int i = 0; i < contents.length; i++){
            new CyclicBarrierWorker(contents,i , results, cb).start();
        }
    }
}

class CyclicBarrierSum extends Thread{
    private int[] results = null;
    public CyclicBarrierSum(int[] results){
        this.results = results;
    }
    public void run(){
        int sum = 0;
        for(int i = 0; i < results.length; i++){
            sum += results[i];
        }
        System.out.println("a總共出現的次數為 " + sum);
    }
}

class CyclicBarrierWorker extends Thread{
    //需要統計的內容
    private String[] contents = null;
    //本線程需要統計的索引
    private int index = 0;
    //用引用傳遞的方式,返回統計的結構數據
    private int[] results;
    private CyclicBarrier cb;
    public CyclicBarrierWorker(String[] contents, int index, int[] results, CyclicBarrier cb){
        this.contents = contents;
        this.index = index;
        this.results = results;
        this.cb = cb;
    }
    public void run(){
        System.out.println("Thread=" + Thread.currentThread().getName());
        String str = contents[index];
        str = str.trim();

        int count = 0;
        for(int i = 1; i <= str.length(); i++){
            if("a".equalsIgnoreCase(str.substring(i - 1, i))){
                count++;
            }
        }
        System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個");
        results[index] = count;

        //到達柵欄,等待
        try {
            cb.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

4. Semaphore

Semaphore的意思就是信號量,也是一個輔助同步器類

Semaphore支持公平鎖與非公平鎖

Semaphore維護一定數量的“許可證”。當有線程想要訪問共用資源的時候,首先就要去獲得許可,如果許可證不夠了,線程就一直等待,直到獲得許可證。

線程使用完共用資源過後,應該歸還“許可證”,以供其他的線程來獲得許可證。當線程使用完共用資源過後,應該歸還許可證,以供其他的線程來獲得許可證。

代碼示例:

public class SemaphoreTest {
    //模擬前面走了一個人,後一個人才能進入地鐵站
    public static void main(String[] args) {
        //假設地鐵站容量為3,但是要來10倍容量的人
        int maxNum = 3;
        Semaphore sp = new Semaphore(maxNum);

        for(int i = 0; i < maxNum * 10; i++){
            new WeAreProgrammer(sp).start();
        }
    }

}

class WeAreProgrammer extends Thread{
    private Semaphore sp = null;
    public WeAreProgrammer(Semaphore sp){
        this.sp = sp;
    }
    public void run(){
        System.out.println("到達地鐵口,請求獲取進入地鐵站的許可,線程= "+ Thread.currentThread().getName());
        //請求得到許可
        try {
            sp.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("獲取到許可,終於可以進入到地鐵站了,開始等地鐵,線程= "+ Thread.currentThread().getName());
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("上車了,真開心,釋放許可,線程= "+ Thread.currentThread().getName());
        //釋放許可
        sp.release();
    }
}

5. Exchanger

Exchanger是交換器的意思,也是一個輔助同步器類。

Exchanger可用於兩個線程間交換數據(單槽位交換數據)。也支持多個線程間交換數據(多槽位交換數據),但是多少線程交換數據無法保證把某一線程的數據與另一指定線程交換。

代碼示例

public class ExchangerTest {
    //實現兩個線程之間交換數據
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        new ExchangerA(exchanger).start();
        new ExchangerB(exchanger).start();
    }
}

class ExchangerA extends Thread{
    //假定交換字元串類型的數據,所有泛型為String
    private Exchanger<String> exchanger = null;
    public ExchangerA(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    public void run(){
        System.out.println("ExchangerA在做之間的業務,產生了數據,線程= "+ Thread.currentThread().getName());
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerA等待與ExchangerB交換數據");
        //交換數據
        try {
            String exchangeMsg = exchanger.exchange("《這是ExchangerA的信息》");
            System.out.println("獲得ExchangerB交換過來的數據=" + exchangeMsg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerA接著執行自己的業務");
    }
}

class ExchangerB extends Thread{
    //假定交換字元串類型的數據,所有泛型為String
    private Exchanger<String> exchanger = null;
    public ExchangerB(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    public void run(){
        System.out.println("ExchangerB在做之間的業務,產生了數據,線程= "+ Thread.currentThread().getName());
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerB等待與ExchangerB交換數據");
        //交換數據
        try {
            String exchangeMsg = exchanger.exchange("《這是ExchangerB的信息》");
            System.out.println("獲得ExchangerA交換過來的數據=" + exchangeMsg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ExchangerB接著執行自己的業務");
    }
}

6. Executors框架

Executors框架是用來對多個線程,按照一定的策略,進行調度,執行和控制的框架。主要用於對線程池的管理

Executor介面

該介面內只有一個方法void execute(Runnable command);

用來在任務創建和任務執行之間進行解耦,用來提交可執行任務

ExecutorService介面

Executor的增強介面,繼承了Executor介面,在Executor的基礎上,增加了幾類實用的方法。提供對線程池聲明周期的管理,增加了對非同步任務,批處理任務的支持。

  • 關閉執行器:

    • void shutdown();:在調用該方法的時候,已經提交給執行器的任務會繼續執行,不會接受新任務的提交
    • List<Runnable> shutdownNow();:不會接受新任務的提交,也會嘗試中斷現在正在執行的任務(需要任務支持響應中斷)
  • 監視執行器的狀態:

    • boolean isShutdown();:如果執行器關閉了,返回true
    • boolean isTerminated();:如果執行器終止(執行器關閉,並且所有的任務都已經完成)了,返回true
  • 提供了對非同步任務的支撐:

    • <T> Future<T> submit(Callable<T> task);:提交有返回值的任務
    • <T> Future<T> submit(Runnable task, T result);:提交任務,result指向返回值,可在外部通過get獲得
    • Future<?> submit(Runnable task);:提交沒有返回值的任務
  • 提供了對批處理任務的支持:

    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks):執行任務集合,返回Future集合
    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):執行任務集合,返回Future集合,加入了超時限制
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks):隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務。
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務,加入了超時限制。

ScheduledExecutorService介面

繼承了ExecutorService介面,用來定時執行或者周期性執行任務。

  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);:提交一個任務,延時一段時間執行
  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);:提交一個任務,延時一段時間執行
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);:提交一個任務,時間等於或超過time首次執行任務,之後每隔period*unit重覆執行任務。
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);:創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

Executors

Executors用於輔助創建Executor介面的實現類的實例

Executors是一個簡單工廠,大體包括

  • 創建固定線程數的線程池

    • ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
      
  • 創建可緩存的線程池

    • ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
      
  • 創建可延時,可周期執行的線程池

    • ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
          return new ScheduledThreadPoolExecutor(corePoolSize);
      }
      
  • 創建單個線程數的線程池

    • ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
      
  • 創建Fork/Join框架

    • ExecutorService newWorkStealingPool() {
          return new ForkJoinPool
              (Runtime.getRuntime().availableProcessors(),
               ForkJoinPool.defaultForkJoinWorkerThreadFactory,
               null, true);
      }
      

ThreadPoolExecutor

ThreadPoolExecutor繼承AbstractExecutorService類,而AbstractExecutorService又實現了ExecutorService介面

為什麼引入線程池:

  • 減少因為頻繁創建和銷毀檢查所帶來的開銷
  • 提高程式的響應速度
  • 自動管理線程,調度任務的執行,使得調用者只用關註任務的創建

構造方法參數解釋

線程池的構造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

參數

corePoolSize:核心線程池數量

maximumPoolSize:線程池最大線程數量(核心線程池數量+非核心線程池數量)

keepAliveTime:非核心線程存活時間

unit:存活時間的單位

workQueue:任務隊列,當線程池的線程滿了之後,任務會進入任務隊列等待提交

handler:拒絕策略

拒絕策略

有兩種情況會執行拒絕策略:

  • 核心線程池滿了,任務隊列也滿了,同時非核心線程池也滿了,這個時候再提交新的任務就會拒絕。
  • 提交任務的時候,線程池關閉了。

一共有四種拒絕策略:

  • AbortPolicy:拋出一個異常
  • DiscardPolicy:什麼都不做,等任務自己被回收
  • DiscardOldestPolicy:丟棄隊列中最先入隊的一個任務,並執行當前任務
  • CallerRunsPolicy:用調用者的線程來執行任務

線程池狀態

用AtomicInteger類型的變數ctl來表示線程池的狀態,高3位保存線程池的狀態,低29位保存線程的數量

一共五種狀態:

  • RUNNING(-1):接受新任務,而且會處理已經進入阻塞隊列的任務

  • SHUTDOWN(0):不接受新任務,但會處理已經進入阻塞隊列的任務

  • STOP(1):不再接受新的任務,而且不再處理已經進入阻塞隊列的任務,同時會中斷正在運行的任務

  • TIDYING(2):所有任務都已經終止,工作線程數量為0時,準備調用terminated方法

  • TERMINATED(3):terminated方法已經執行完成

線程池狀態變化示意圖:

7. Future介面

Future模式是多線程中一種常見的設計模式,用來實現非同步執行任務。調用方拿到的是憑證,拿到憑證之後就可以去處理其他的任務,在需要使用結果的時候再使用憑證還獲取調用結果。Future介面的具體實現類是FutureTask

FutureTask

狀態:

  • NEW(0):表示任務的初始化狀態
  • COMPLETING(1):表示任務已經執行完成,屬於中間狀態
  • NORMAL(2):表示任務正常完成,屬於最終狀態
  • EXCEPTIONAL(3):表示任務異常完成,屬於最終狀態
  • CANCELLED(4):表示任務還沒有開始執行就被取消了(非中斷方式),屬於最終狀態
  • INTERRUPTING(5):表示任務還沒有開始執行就被取消了(中斷方式),屬於中間狀態
  • INTERRUPTED(6):表示任務還沒有開始執行就被取消了(中斷方式),已經被中斷,屬於最終狀態

狀態變化示意圖


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在跟著尚矽谷的視頻學習Vue腳手架時,發現main.js文件中的格式同視頻中的完全不一樣,內容倒是可以看懂,但因為初學的緣故,還是照貓畫虎的操作一遍為好,但是在編寫render的時候始終不行,百度半天,沒有找到解決方法,然後思考一下,既然預設實例模版不一樣,那麼,是不是在創建工程時創建成Vue3版本 ...
  • Servlet就是一個介面我們需要寫一個類然後去實現Servlet,就可以被伺服器識別到。request是用來接受客戶端傳過來的參數,respone是用來響應客戶端的頁面。我們所用的容器是一個繼承的java容器tomcat。 ...
  • 前言 最近我小表妹迷上了玩連連看,玩了一個星期了還沒通關,真的是菜。 我實在是看不過去了,直接用python寫了個腳本代碼,一分鐘一把游戲。 快是快,就是聯網玩容易被罵,嘿嘿~ 但是,又不是我玩,有什麼關係呢~ 哈哈哈 😎 代碼 導入所需模塊 # -*- coding:utf-8 -*- impo ...
  • sort包簡介 官方文檔 Golang的sort包用來排序,二分查找等操作。本文主要介紹sort包里常用的函數,通過實例代碼來快速學會使用sort包 sort包內置函數 sort.Ints(x []int) ints := []int{1, 4, 3, 2} fmt.Printf("%v\n", i ...
  • 本文mybatis-spring-boot探討在springboot工程中mybatis相關對象的註冊與載入。 建議先瞭解mybatis在spring中的使用和springboot自動裝載機制,再看此文章。 傳送門:Mybatis源碼解讀-配置載入和Mapper的生成 問題 @MapperScan和 ...
  • 一個工作了7年的粉絲,遇到了一個Zookeeper的問題。 因為接觸過Zookeeper這個技術,不知道該怎麼回答。 我說一個工作了7年的程式員,沒有接觸過主流技術,這不正常。 於是我問了他工資以後,我理解了! 好吧,關於“Zookeeper中Watch機制的實現原理”,看看普通人和高手的回答。 普 ...
  • 作者:何甜甜在嗎 鏈接:https://juejin.cn/post/6916150628955717646 寫在前面 在介紹具體方案之前,首先先介紹一下常見的加密演算法。加密演算法可以分為三大類: 對稱加密演算法 非對稱加密演算法 Hash演算法 對稱加密演算法 加密和解密使用相同的密鑰。對稱加密演算法加密解密 ...
  • RocketMQ學習 1.基本概念 RocketMQ是阿裡巴巴團隊使用java語言開發的一款分散式消息中間件,是一款低延遲,高可用,擁有海量消息堆積能力和靈活拓展性的消息隊列。 rocketmq的官網:http://rocketmq.apache.org gitee倉庫:https://gitee. ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...