Java Executor併發框架(一)

来源:http://www.cnblogs.com/vhua/archive/2016/03/14/5277694.html
-Advertisement-
Play Games

Java是天生就支持併發的語言,支持併發意味著多線程,線程的頻繁創建在高併發及大數據量是非常消耗資源的,因為java提供了線程池。在jdk1.5以前的版本中,線程池的使用是及其簡陋的,但是在JDK1.5後,有了很大的改善。JDK1.5之後加入了java.util.concurrent包,java.u


一、概述

Java是天生就支持併發的語言,支持併發意味著多線程,線程的頻繁創建在高併發及大數據量是非常消耗資源的,因為java提供了線程池。在jdk1.5以前的版本中,線程池的使用是及其簡陋的,但是在JDK1.5後,有了很大的改善。JDK1.5之後加入了java.util.concurrent包,java.util.concurrent包的加入給予開發人員開發併發程式以及解決併發問題很大的幫助。這篇文章主要介紹下併發包下的Executor介面,Executor介面雖然作為一個非常舊的介面(JDK1.5 2004年發佈),但是很多程式員對於其中的一些原理還是不熟悉,因此寫這篇文章來介紹下Executor介面,同時鞏固下自己的知識。如果文章中有出現錯誤,歡迎大家指出。

二、Executors工廠類

對於資料庫連接,我們經常聽到資料庫連接池這個概念。因為建立資料庫連接時非常耗時的一個操作,其中涉及到網路IO的一些操作。因此就想出把連接通過一個連接池來管理。需要連接的話,就從連接池裡取一個。當使用完了,就“關閉”連接,這不是正在意義上的關閉,只是把連接放回到我們的池裡,供其他人在使用。所以對於線程,也有了線程池這個概念,其中的原理和資料庫連接池是差不多的,因此java jdk中也提供了線程池的功能。

線程池的作用:線程池就是限制系統中使用線程的數量以及更好的使用線程

    根據系統的運行情況,可以自動或手動設置線程數量,達到運行的最佳效果:配置少了,將影響系統的執行效率,配置多了,又會浪費系統的資源。用線程池配置數量,其他線程排隊等候。當一個任務執行完畢後,就從隊列中取一個新任務運行,如果沒有新任務,那麼這個線程將等待。如果來了一個新任務,但是沒有空閑線程的話,那麼把任務加入到等待隊列中。

為什麼要用線程池:

  1. 減少線程創建和銷毀的次數,使線程可以多次復用
  2. 可以根據系統情況,調整線程的數量。防止創建過多的線程,消耗過多的記憶體(每個線程1M左右)

Java裡面線程池的頂級介面是Executor,但是嚴格意義上講Executor並不是一個線程池,而只是一個執行線程的工具。真正的線程池介面是ExecutorService。Executors類,提供了一系列工廠方法用於創先線程池,返回的線程池都實現了ExecutorService介面。

比較重要的幾個類:

ExecutorService

真正的線程池介面。

ScheduledExecutorService

能和Timer/TimerTask類似,解決那些需要任務重覆執行的問題。

ThreadPoolExecutor

ExecutorService的預設實現。

ScheduledThreadPoolExecutor

繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,周期性任務調度的類實現。

要配置一個線程池是比較複雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類裡面提供了一些靜態工廠,生成一些常用的線程池。

1. newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

 

創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串列執行所有任務。如果這個唯一的線程因為異常結束,那麼會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

2. newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

 

創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,在提交新任務,任務將會進入等待隊列中等待。如果某個線程因為執行異常而結束,那麼線程池會補充一個新線程。

3. newCachedThreadPool

public static ExecutorService newCachedThreadPool()

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

 

創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,

那麼就會回收部分空閑(60秒處於等待任務到來)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池的最大值是Integer的最大值(2^31-1)。

4. newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

 

創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

 

實例:

註:使用了java8的lambda表達式以及stream

1.newSingleThreadExecutor

public class SingleThreadExecutorTest {

    public static void main(String[] args) {
ExecutorService executor
= Executors.newSingleThreadExecutor();
IntStream.range(
0, 5).forEach(i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); })); try { //close pool executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (!executor.isTerminated()) { executor.shutdownNow(); } } } }

 

輸出結果:

finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1

 

線程名都一樣,說明是同一個線程

2.newFixedThreadPool

public class FixedThreadExecutorTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
//close pool 同上...
    
    }
}

 

輸出結果:

finished: pool-1-thread-2
finished: pool-1-thread-3
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-2
finished: pool-1-thread-3

 

只創建了三個線程

3.newCachedThreadPool

public class CachedThreadExecutorTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();
        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
//close pool 同上... } }

 

輸出結果:

finished: pool-1-thread-1
finished: pool-1-thread-2
finished: pool-1-thread-3
finished: pool-1-thread-6
finished: pool-1-thread-5
finished: pool-1-thread-4

 

創建了6個線程

4.newScheduledThreadPool

public class ScheduledThreadExecutorTest {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

        executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS);

            //close pool 同上
    }
}

 

輸出結果:

1457967177735
1457967179736
1457967181735

 

三:ThreadPoolExecutor詳解

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地瞭解Java中的線程池,必須先瞭解這個類。下麵我們來看一下ThreadPoolExecutor類的具體實現源碼。

  在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

 

從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

ThreadPoolExecutor的完整構造方法的簽名是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

 

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有非常大的關係。在創建了線程池後,預設情況下,線程池中並沒有 任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者 prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建 corePoolSize個線程或者一個線程。預設情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線 程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示線上程池中最多能創建多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。預設情況下,只有當線程池中的線程數大於corePoolSize 時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize。即當線程池中的線程數大於corePoolSize 時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了 allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大於corePoolSize 時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇:
    ArrayBlockingQueue; //有界隊列
    LinkedBlockingQueue; //無界隊列
    SynchronousQueue; //特殊的一個隊列,只有存在等待取出的線程時才能加入隊列,可以說容量為0,是無界隊列

     ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來創建線程;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
    ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
    ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重覆此過程)
    ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

     

    ThreadPoolExecutorExecutors類的底層實現。

    在JDK幫助文檔中,有如此一段話:

    強烈建議程式員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個後臺線程)

    它們均為大多數使用場景預定義了設置。”

    下麵介紹一下幾個類的源碼:

    ExecutorService   newFixedThreadPool (int nThreads):固定大小線程池。

    可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實際上,後面會介紹,如果使用無界queue的話 maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表明什麼?-就是該實現不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。

public static ExecutorService newFixedThreadPool(int nThreads) {   
          return new ThreadPoolExecutor(nThreads, nThreads,   
                                        0L, TimeUnit.MILLISECONDS,   
                                       new LinkedBlockingQueue<Runnable>());   
      }

  ExecutorService   newSingleThreadExecutor():單線程

 public static ExecutorService newSingleThreadExecutor() {   
          return new FinalizableDelegatedExecutorService   
              (new ThreadPoolExecutor(1, 1,   
                                      0L, TimeUnit.MILLISECONDS,   
                                      new LinkedBlockingQueue<Runnable>()));   
      }

ExecutorService newCachedThreadPool():無界線程池,可以進行自動線程回收

這個實現就有意思了。首先是無界的線程池,所以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該 QUEUE中,每個插入操作必須等待另一個線程的對應移除操作。

public static ExecutorService newCachedThreadPool() {   
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                       60L, TimeUnit.SECONDS,   
                                        new SynchronousQueue<Runnable>());   
    }

 

從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,AbstractExecutorService是一個抽象類,它實現了ExecutorService介面。

public abstract class AbstractExecutorService implements ExecutorService 

 

 而ExecutorService又是繼承了Executor介面

 

public interface ExecutorService extends Executor 

 

 

我們看一下Executor介面的實現:

public interface Executor {
    void execute(Runnable command);
}

 

到這裡,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。

  Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

  然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中聲明的所有方法;

  然後ThreadPoolExecutor繼承了類AbstractExecutorService。

  在ThreadPoolExecutor類中有幾個非常重要的方法:

public void execute(Runnable command)

public <T> Future<T> submit(Callable<T> task)

public void shutdown()

public List<Runnable> shutdownNow()  //返回未執行的任務

execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中 並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的 實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

  shutdown()和shutdownNow()是用來關閉線程池的。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 必知:軟體企業要求基礎軟體工程師具備六大基本素質,即良好的編碼能力、自覺的規範意識和團隊精神、認識和運用資料庫的能力、較強的英語閱讀和寫作能力、具有軟體工程的概念和求知欲和進取心。                   1.良好的編碼能力。軟體人員的一個重要職責是把用戶的需求功能用某種電腦語言予以實
  • python利用or在列表解析中調用多個函數.py
  • 2.在定義常量的時候C語言中定義為const而JAVA中為final3.在JAVA聲明成員變數的時候,使用static來定義。4.在JAVA中的boolean類型只包括true和false,但是在C中非0為true,0為false5.在JAVA中byte、short、int、long其存儲空間分別為
  • 今天幫同學想用C實現數組的折半查找,本來演算法挺簡單的,可是折騰了好幾個小時才發現問題在哪,這個sizeof坑人不淺啊。 明白這裡了,附上一篇C實現折半的代碼  
  • 一、Java的特點:一次編譯,到處運行時間。   C語言在windows下執行:C源程式(.c)——>編譯 windows可執行文件(.exe)——>windows操作系統  Java語言:Java源文件——>編譯 Java位元組碼文件(.class)——>JVM虛擬機下 (能直接解釋Java位元組碼C
  • -->位元組碼解釋器工作就是通過改變這個計數器的值來選取下一條需要執行的位元組碼指令,分支、迴圈、跳轉、異常處理、線程恢復等基礎功能都需要依賴計數器來完成。 -->為了線程切換後能恢復到正確的執行位置,每條線程都需要一個獨立的程式計數器,各條線程之間計數器互不影響,獨立存儲,我們稱這類記憶體區域為"線程私
  • 初學python第一天,希望自己真正瞭解電腦語言,並且做出成效。   寫下學習筆記,記錄學習進度,娛樂學習,不斷成長。   python詳細介紹:   python是什麼?運用到哪裡?有哪些在使用它? python是一門編程語言,可以運用到各種場景,基本大型公司都在用它,主要運用在網路編程方面。如
  • 這是個Linux上簡易的http伺服器實現,主要就是tcp socket 和 pipe管道cgi重定向. 解析http協議. 簡單的處理並反饋給客戶端.並附帶一個client.c 測試服務端. 比較簡陋 去掉註釋400行以下. 能夠幫我們加深web伺服器的理解.
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...