線程組和線程池 一. 線程組 1. 線程組介紹及使用 Java使用ThreadGroup來表示線程組,它可以對一批線程進行分類管理,Java允許直接對線程組進行控制。對線程組的控制相當於控制這批線程。 在預設情況下,子線程和創建它的父線程同屬於一個線程組。 一旦線程假如某個線程組之後,該線程將一直屬 ...
線程組和線程池
一. 線程組
1. 線程組介紹及使用
Java使用ThreadGroup來表示線程組,它可以對一批線程進行分類管理,Java允許直接對線程組進行控制。對線程組的控制相當於控制這批線程。
在預設情況下,子線程和創建它的父線程同屬於一個線程組。
一旦線程假如某個線程組之後,該線程將一直屬於該線程組,知道該線程死亡,線程運行途中不能改變它所屬的線程組。
Thread提供了不同構造器設置新創建的線程屬於哪個線程組。提供getThreadGroup()方法返回該線程所屬的線程組對象。
ThreadGroup類提供瞭如下兩個構造器創建實例。
- ThreadGroup(String name):以指定的線程組名字來創建新的線程組
- ThreadGroup(ThreadGroup parent,String name):以指定的名字、指定的父線程組創建一個新線程組
Java程式不允許改線程組名字,通過getName()方法獲取線程組名字。
ThreadGroup類提供瞭如下常用的方法
- int activeCount():返回此線程組中活動的線程數目
- interrupt():中斷此線程組中的所有線程
- isDaemon():判斷該線程組是否是後臺線程組
- setDaemon(boolean daemon):把該線程組設置成後臺線程組。
- setMaxPriority(int pri):設置線程組的最高優先順序。
2.線程組和異常處理機制
從Java 5開始,Java加強了線程的異常處理,如果線程執行過程中拋出了一個未處理異常,JVM在結束該線程之前會自動查找是否有對應的Thread.UncaughtExceptionHandler對象,如果找到該處理器對象,則會調用該對象的uncaughtException(Thread t,Throwable e)方法來處理該異常。
Thread類提供了兩個方法設置異常處理器。
- static setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):為該線程類的所有線程實例設置預設的異常處理器
- setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):為指定的線程實例設置異常處理器
ThreadGroup類實現了Thread.UncaughtExceptionHandler介面,所以每個線程所屬的線程組將會作為預設的異常處理器。
如果線程執行過程中拋出了一個未處理異常,JVM在結束該線程之前會自動查找是否有對應的Thread.UncaughtExceptionHandler對象,如果找到該處理器對象,則會調用該對象的uncaughtException(Thread t,Throwable e)方法來處理該異常;否則,JVM會調用該線程所屬的線程組對象的uncaughtException()方法來處理該異常。
線程組處理異常的流程如下:
- 如果該線程組有父線程組,則調父線程組的uncaughtException()方法來處理該異常。
- 如果該線線程實例所屬的線程類有預設的異常處理器,那麼調用該異常處理器來處理異常
- 如果該對象是ThreadDeath對象,則不做任何處理;否則,將異常跟蹤棧的信息列印到System.err錯誤輸出流,並結束該線程。
下麵主程式設置了異常處理器。
package com.gdut.thread; class MyExHandler implements Thread.UncaughtExceptionHandler{ @Override public void uncaughtException(Thread t, Throwable e) { System.out.println(t+"線程出現了異常"+e); } } public class ExHandler { public static void main(String[] args) { Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler()); int a=5/0; System.out.println("程式正常結束"); } }
輸出:Thread[main,5,main]線程出現了異常java.lang.ArithmeticException: / by zero
二. 線程池
系統啟動一個新線程的成本是非常高的,因為它涉及與操作系統交互。當程式中需要創建大量生存期很短暫的線程時,應該考慮使用線程池來提高系統性能。
與資料庫連接池類似的是,線程池在系統啟動時即創建大量空閑的線程,當序將一個Runnable對象或Callable對象創給線程池,線程池就會啟動一個線程來執行他們的run()或call()方法,當run()或call()方法執行結束後,該線程並不會死亡,而是再次返回線程池中稱為空閑狀態,等待執行下一個Runnable對象的run()或call()方法。
除此之外,使用線程池可以有效地控制系統中併發線程的數量,當系統中包含大量併發線程時,會導致系統性能劇烈下降,甚至導致JVM崩潰。
2.1 Java 8改進的線程池
從Java 5開始,Java內建支持線程池。Java 5新增了一個Executors工廠類來生產線程池,包含如下靜態方法生產不同的線程池。
- newCachedThreadPool():創建一個具有緩存功能的線程池,系統根據需要創建線程,這些線程將被還存線上程池中
- newFixdThreadPool(int nThread):創建一個可重用的,具有固定線程數的線程池
- newSingleThreadExecutor():創建一個只有單線程的線程池,相當於newFixdThreadPool(int nThread)傳入參數1。
- newScheduledThreadPool(int corePoolSize):創建具有指定線程數的線程池,它可以在指定延遲後執行線程任務。corePoolSize指池中所保存的線程數,即使線程是空閑的也被保存線上程池內
- newSingleThreadledExecutor():創建一個線程的線程池,它可以在指定延遲後執行線程任務。
- ExecutorService newWorkStealingPool(int parallelism):創建持有足夠線程的線程池來支持給定的並行級別,該方法還會使用多個隊列來減少競爭。
- ExecutorService newWorkStealingPool():該方法是前一個方法的簡化版,他根據當前機器的CPU核數設置並行級別。
上面7個方法中的前三個方法返回一個ExecutorService對象,該對象代表一個線程池,它可以執行Runnable對象和Callable對象所代表的線程,中間兩個方法返回一個ScheduledExecutorService線程池,它是ExecutorService的子類,它可以在指定延遲後執行線程任務,最後兩個方法是Java 8新增的,他充分利用多CPU的並行能力。這兩個方法生成的work stealing池,都相當於後臺線程池,如果所有的前臺線程都死亡了,work stealing池中的線程也會自動死亡。
ExecutorService對象的submit()方法可以執行Runnable對象或C對象代表的任務,返回Future對象,該對象代表call()方法的返回值。
ScheduledExecutorServic代表指定延遲後或周期性的執行任務的線程池,它提供瞭如下四個方法
- ScheduledFuture<V> schedule(Callable<V> callble,long delay,TimeUnit unit):指定callable任務將在delay延遲後執行
- ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit):指定command任務將在delay延遲後執行
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):指定command任務將在delay延遲後以設定頻率重覆執行,也就是說,在initialDelay後開始執行,依次在initialDelay+period、initialDelay+period*2...重覆執行。
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):創建並執行一個給定初始延遲後首次啟動的定期操作,以後每一次執行終止和下一次執行的開始之間都存在給定的延遲。如果任務在某一次執行時遇到異常,就會取消後續執行;否則,只能通過程式顯式取消或終止該任務。
用完一個線程池後,應該調用線程池的shutdown()方法啟動線程池的關閉序列,調用shutdown()方法後的線程池不再接受任務,但會將以前所有已提交任務執行完成;另外也可以調用shutdownNow()方法關閉線程池,該方法試圖停止所有在執行的活動任務,暫停處理正在等待執行的任務,並返回等待執行的任務列表。
使用線程池的任務步驟如下:
- 調用Executors類的靜態工廠方法創建一個ExecutorService對象,該對象代表一個線程池
- 創建Runnable實現類或Callable實現類的實例,作為線程執行任務
- 調用ExecutorService對象的submit()方法來提交Runnable或Callable的實例代表的任務
- 當不想提交任何任務時,調用ExecutorService對象的shutdown()方法來關閉線程池。
實例如下:
package com.gdut.thread.threadPool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolTest { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(6); Runnable runnable1 = ()->{ for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName()+" "+i); } }; Runnable runnable2 = ()->{ for (char i = 'a'; i < 'z'; i++) { System.out.println(Thread.currentThread().getName()+" "+i); } }; pool.submit(runnable1); pool.submit(runnable2); pool.submit(runnable2); pool.shutdown(); } }
2.2 Java 8 增強的線程池
為了充分利用多CPU的優勢、多核CPU的性能優勢。可以考多個小任務,把小任務放到多個處理器核心上並行執行;當多個小任務執行完成之後,再將這些執行結果合併起來即可。Java 7提供了ForkJoinPool來支持這個功能。
ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。提供瞭如下兩個常用的構造器
- ForkJoinPool(int parallelism):創建一個包含parallelism個並行線程的ForkJoinPool.
- ForkJoinPool():以Runtime.availableProssesors()方法的返回值作為paralelism參數來創建ForkJoinPool.
Java 8進一步拓展了ForkJoinPool的功能,Java 8增加了通用池功能。ForkJoinPool通過如下兩個方法提供通用池功能。
- ForkJoinPool commonPool():該方法返回一個通用池,通用池的狀態不會受shutdown()或shutdownNow()方法的影響。
- int getCommonPoolParallelism():該方法返回通用池的並行級別。
創建了通用池ForkJoinPool實例之後,就可調用ForkJoinPool的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法來執行指定任務了。其中,ForkJoinTask代表一個並行,合併的任務。
ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecursiveAction和recursiveTask。其中RecursiveAction代表沒有返回值的任務,RecursiveTask代表有返回值的任務。
下麵程式將一個大任務(列印0~500)的數值分成多個小任務,並將任務交給ForkJoinPool來執行。
package com.gdut.thread.threadPool; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; class PrintTask extends RecursiveAction{ private static final int THRESHOLD = 50; private int start; private int end; public PrintTask(int start,int end) { this.start = start; this.end = end; } @Override protected void compute() { if(end-start<THRESHOLD){ for (int i = start; i <end ; i++) { System.out.println(Thread.currentThread().getName()+"的i值"+i); } }else{ //當end與start的差大於THRESHOLD時,即要列印的數超過50時,將大任務分成兩個小任務 int middle = (end+start)/2; PrintTask left = new PrintTask(start,middle); PrintTask right = new PrintTask(middle,end); left.fork(); right.fork(); } } } public class ForkJoinPoolTest{ public static void main(String[] args) throws InterruptedException{ ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(0,500)); pool.awaitTermination(2, TimeUnit.SECONDS); pool.shutdown(); } }
8核電腦的執行效果
下麵程式示範了使用RecursiveTask對一個長度為100的數組的元素值進行累加。
package com.gdut.thread.threadPool; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; class CalTask extends RecursiveTask<Integer>{ private static final int THRESHOLD = 20; private int[] arr; private int start; private int end; public CalTask(int[] arr,int start,int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if(end-start<THRESHOLD){ for (int i = start; i <end ; i++) { sum +=arr[i]; System.out.println(Thread.currentThread().getName()+"加上"+arr[i]+"後的累加值為:"+sum); } return sum; }else{ int middle = (end+start)/2; //將大任務分成兩個小任務 CalTask left = new CalTask(arr,start,middle); CalTask right = new CalTask(arr,middle,end); //執行兩個小任務 left.fork(); right.fork(); //將兩個小任務累加的結果合併起來 return left.join() + right.join(); } } } public class Sum { public static void main(String[] args) throws Exception{ int[] arr = new int[100]; Random rand = new Random(); int total = 0; for (int i = 0; i <100 ; i++) { int tmp = rand.nextInt(20); total += (arr[i] = tmp); } System.out.println(total); ForkJoinPool pool = new ForkJoinPool(); Future future = pool.submit(new CalTask(arr,0,100)); System.out.println(future.get()); pool.shutdown(); } }