前言:由於最近在做SDK的功能,需要設計線程池。看了很多資料不知道從何開始著手,突然發現了AsyncTask有對線程池的封裝,so,就拿它開刀,本文將從AsyncTask的基本用法,到簡單的封裝,再到任務隊列分析,最後自定義線程池。 1、概念 Android 中的非同步任務常用的一種方式是:Handl ...
前言:由於最近在做SDK的功能,需要設計線程池。看了很多資料不知道從何開始著手,突然發現了AsyncTask有對線程池的封裝,so,就拿它開刀,本文將從AsyncTask的基本用法,到簡單的封裝,再到任務隊列分析,最後自定義線程池。
1、概念
Android 中的非同步任務常用的一種方式是:Handler + Thread 組合來實現的。Thread 負責子線程的耗時操作,Handler 負責線程間的通信,用的最多的當屬子線程和主線程通信。
Android 為了簡化操作,提供了 AsyncTask 類來實現非同步任務,並且輕鬆實現子線程和主線程間的通信。
2、AsyncTask的簡單封裝
三個參數代表的含義
- Params:第一個參數是啟動任務傳進來的參數;
- Progress:第二個參數是用來顯示進度條的參數;
- Result:第三個參數是後臺執行後返回的參數的類型。
package com.app; import android.os.AsyncTask; /** * Created by ${zyj} on 2016/8/2. */ public class MyTask<T> extends AsyncTask<T , Integer , T> { private TaskListener taskListener ; public MyTask(){ } //執行預處理,它運行於UI線程,可以為後臺任務做一些準備工作,比如繪製一個進度條控制項。 @Override protected void onPreExecute() { if ( taskListener != null ){ taskListener.start(); } } //運行於UI線程,可以對後臺任務的結果做出處理,結果就是doInBackground(Params...)的返回值。 @Override protected void onPostExecute(T t) { if ( taskListener != null ){ taskListener.result( t ); } } /** * 更新子線程進度,運行於UI線程 * @param values */ @Override protected void onProgressUpdate(Integer... values) {; if ( taskListener != null ){ taskListener.update( values[0] ); } } //運行與後臺線程 @Override protected T doInBackground(T... ts) { if ( taskListener != null ){ return (T) taskListener.doInBackground( ts[0] ) ; } return null; } public MyTask setTaskListener(TaskListener taskListener ){ this.taskListener = taskListener ; return this ; } /** * 更新進度 * @param progress */ public void updateProgress( int progress ){ publishProgress( progress ); } public interface TaskListener<T>{ void start() ; void update( int progress ) ; T doInBackground( T t ); void result( T t ); } /** * 取消一個正在執行的任務 */ public void cancle(){ if ( !isCancelled() ){ cancel( true ) ; } } }
3、簡單的非同步任務使用
package com.app; import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import wifi.app.wei.com.myapplication.R; public class MainActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); new MyTask<String>().setTaskListener(new MyTask.TaskListener() { @Override public void start() { Log.d( "task--" , "start 開始了, 運行在主線程" ) ; } @Override public void update(int progress) { } @Override public Object doInBackground(Object o) { Log.d( "task--" , "doInBackground , 運行在子線程" ) ; return null; } @Override public void result(Object o) { Log.d( "task--" , "result , 運行在主線程" ) ; } }).execute( "" ) ; } }
4、帶有進度更新的非同步任務使用
package com.app; import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.widget.TextView; import wifi.app.wei.com.myapplication.R; public class MainActivity extends AppCompatActivity { private TextView textView ; private MyTask myTask ; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); textView = (TextView) findViewById( R.id.tv1 ); myTask = new MyTask<String>().setTaskListener(new MyTask.TaskListener() { @Override public void start() { Log.d( "task--" , "start 開始了, 運行在主線程" ) ; textView.setText( "任務開始了" ); } @Override public void update(int progress) { textView.setText( "進度" + progress ); } @Override public Object doInBackground(Object o) { Log.d( "task--" , "doInBackground , 運行在子線程" ) ; for ( int i = 0 ; i < 100 ; i++ ){ try { Thread.sleep( 100 ) ; myTask.updateProgress( i ) ; //每隔100毫秒,更新一下進度 } catch (InterruptedException e) { e.printStackTrace(); } } return "結束了"; } @Override public void result(Object o) { Log.d( "task--" , "result , 運行在主線程" ) ; textView.setText( "" + o ); } }) ; //開始執行任務 myTask.execute( "" ) ; } }
執行效果圖
5、AsyncTask 任務執行應該註意的細節
(1)、如果非同步任務需要聯網,則需要添加聯網許可權
<uses-permission android:name="android.permission.INTERNET"/>
(2)、AsyncTask實例必須在UI線程中創建,execute(Params…)方法必須在UI線程中調用。不用手動調用onPreExecute()。
(3)、一個任務只能被執行一次
6、如何取消任務
可以調用 myTask.cancle() ;
但是這個方法並沒有真正的結束任務,只是設置了一個標誌位,把當前線程中斷了。
取消任務實踐
package com.app; import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.view.View; import android.widget.TextView; import wifi.app.wei.com.myapplication.R; public class MainActivity extends AppCompatActivity { private TextView textView ; private MyTask myTask ; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); textView = (TextView) findViewById( R.id.tv1 ); textView.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { //取消任務 myTask.cancle(); } }); myTask = new MyTask<String>().setTaskListener(new MyTask.TaskListener() { @Override public void start() { Log.d( "task--" , "start 開始了, 運行在主線程" ) ; textView.setText( "任務開始了" ); } @Override public void update(int progress) { textView.setText( "進度" + progress ); } @Override public Object doInBackground(Object o) { Log.d( "task--" , "doInBackground , 運行在子線程" ) ; for ( int i = 0 ; i < 100 ; i++ ){ try { Thread.sleep( 100 ) ; myTask.updateProgress( i ) ; //每隔100毫秒,更新一下進度 } catch (InterruptedException e) { e.printStackTrace(); } } return "結束了"; } @Override public void result(Object o) { Log.d( "task--" , "result , 運行在主線程" ) ; textView.setText( "" + o ); } }) ; //開始執行任務 myTask.execute( "" ) ; } }
當點擊textView時,調用了 myTask.cancle() ;方法後,Android studio 控制台拋出了異常
通過這裡我們發現,AsyncTask 雖然提供了cancle( true ) 方法來停止任務,但是這個方法只是中斷了這個線程,但是並不能真正意思上的停止任務,這也是很多人說 AsyncTask 的弊端。極容易造成記憶體溢出的。
幾種結束任務的間接實現方式:
1、判斷標誌位的辦法:
我們要知道在java的線程中,沒有辦法停止一個正在運行中的線程。在Android的AsyncTask中也是一樣的。如果必須要停止一個線程,我們可以採用這個線程中設置一個標誌位,然後線上程run方法或AsyncTask的doInBackground方法中的關鍵步驟判斷這個標誌位以決定是否繼續執行。然後在需要終止此線程的地方改變這個標誌位以達到停止線程的目的。
2、合理的利用Exception
從外部調用AsyncTask的cancel方法並不能停止一個已經啟動的AsyncTask。這個cancel方法的作用與線程的interrupt方法相似,調用了一個線程的interrupt方法之後線程仍然運行,但是如果該線程的run方法裡面調用過sleep的或者wait方法後,處於sleep或wait狀態,則sleep和wait立即結束並且拋出InterruptedException異常。AsyncTask的cancel方法也一樣,如果在這個Task的doInBackground方法中調用了sleep或wait方法,當在UI線程中調用了這個Task實例的cancel方法之後,sleep或wait立即結束並且拋出InterruptedException異常,但是如果捕獲該異常的代碼後面還有其他代碼,則這些代碼還會繼續執行。
3、可以在UI上做手腳
如果用戶在後臺線程正獲取內容時做出了取消的行為,我們可以根據用戶的這種行為在UI上立即做出反饋,此時,即使線程完成了數據的Loading,我們也不讓數據顯示出來,算是一種投機取巧的辦法吧。
7、AsyncTask 串列處理任務 和 並行處理任務
在上面的代碼演示中,執行任務用的都是 myTask.execute() , 這個預設是串列執行任務的。比如同一時刻有兩個任務要處理,AsyncTask 會先執行第一個任務,等第一個任務執行結束,然後才會執行第二個任務。
在AsyncTask中還有一個並行處理任務的方法:executeOnExecutor( Executor exe , Params... params ) 。
下麵是串列執行任務execute()的源碼
通過看源碼,發現其實串列執行任務也是調用了並行的方法 executeOnExecutor () , 只不過啟用了一個預設的 sDefaultExecutor (sDefaultExecutor 是一個串列的線程池)。
有串列線程池,那麼勢必就有一個並行線程池 , 在AsyncTask裡面源碼裡面定義了一個並行線程池: THREAD_POOL_EXECUTOR 。
可以看到並行 THREAD_POOL_EXECUTOR 是通過 new ThreadPoolExecutor() 來創建的
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
參數說明:
corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩衝隊列
handler: 線程池對拒絕任務的處理策略
我們知道,受限於硬體、記憶體和性能,我們不可能無限制的創建任意數量的線程,因為每一臺機器允許的最大線程是一個有界值。也就是說ThreadPoolExecutor管理的線程數量是有界的。線程池就是用這些有限個數的線程,去執行提交的任務。然而對於多用戶、高併發的應用來說,提交的任務數量非常巨大,一定會比允許的最大線程數多很多。為瞭解決這個問題,必須要引入排隊機制,或者是在記憶體中,或者是在硬碟等容量很大的存儲介質中。J.U.C提供的ThreadPoolExecutor只支持任務在記憶體中排隊,通過BlockingQueue暫存還沒有來得及執行的任務。
任務的管理是一件比較容易的事,複雜的是線程的管理,這會涉及線程數量、等待/喚醒、同步/鎖、線程創建和死亡等問題。ThreadPoolExecutor與線程相關的幾個成員變數是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它們共同負責線程的創建和銷毀。
corePoolSize:
線程池的基本大小,即在沒有任務需要執行的時候線程池的大小,並且只有在工作隊列滿了的情況下才會創建超出這個數量的線程。這裡需要註意的是:在剛剛創建ThreadPoolExecutor的時候,線程並不會立即啟動,而是要等到有任務提交時才會啟動,除非調用了prestartCoreThread/prestartAllCoreThreads事先啟動核心線程。再考慮到keepAliveTime和allowCoreThreadTimeOut超時參數的影響,所以沒有任務需要執行的時候,線程池的大小不一定是corePoolSize。
maximumPoolSize:
線程池中允許的最大線程數,線程池中的當前線程數目不會超過該值。如果隊列中任務已滿,並且當前線程個數小於maximumPoolSize,那麼會創建新的線程來執行任務。這裡值得一提的是largestPoolSize,該變數記錄了線程池在整個生命周期中曾經出現的最大線程個數。為什麼說是曾經呢?因為線程池創建之後,可以調用setMaximumPoolSize()改變運行的最大線程的數目。
poolSize:
線程池中當前線程的數量,當該值為0的時候,意味著沒有任何線程,線程池會終止;同一時刻,poolSize不會超過maximumPoolSize。
keepAliveTime
當線程空閑時間達到keepAliveTime,該線程會退出,直到線程數量等於corePoolSize。如果allowCoreThreadTimeout設置為true,則所有線程均會退出直到線程數量為0。
瞭解了各個參數的含義之後,我們來看看 AsyncTask 中預設的並行線程隊列 THREAD_POOL_EXECUTOR 各項的數值
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);
- corePoolSize 為cup數加 1 ;
- maximumPoolSize 為cup數的2倍加1
- 存活時間為1秒
- 任務緩存隊列為 LinkedBlockingQueue
小測試:我手上的手機是聯想 k50-t5 , 在設置裡面看到處理器為 8 核1.7GHZ , 運行 Runtime.getRuntime().availableProcessors(); 方法得到的值為:8 。
另外我們也可以總結出:
- 同一臺手機上,THREAD_POOL_EXECUTOR 的 corePoolSize 和 maximumPoolSize 的值是固定的。
- 在不同的手機上,THREAD_POOL_EXECUTOR 的 corePoolSize 和 maximumPoolSize 的值是不同的。 這種動態設置的方法值得我們學習,在不同的設備上所使用的策略是不同的。但是也是方式也是有弊端的,任務併發數是由cpu的限定的,不可人為的修改。
總結:
//開始執行 串列任務 myTask.execute( "" ) ; 或者 myTask.executeOnExecutor(AsyncTask.SERIAL_EXECUTOR , "" ) ; //開始執行 並行任務 myTask.executeOnExecutor( AsyncTask.THREAD_POOL_EXECUTOR , "" ) ;
8、自定義線程池
上一部分我們已經明白了AsyncTask 的預設並行線程池 THREAD_POOL_EXECUTOR 是通過 new ThreadPoolExecutor() 來創建的 , 那麼我們也可以自己定義一個線程池。
首先來看 ThreadPoolExecutor 的構造函數
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
通過看構造方法,發現 corePoolSize 、maximunPoolSize 、keepAliveTime 、unit 、workQueue 是必須要寫的。
分析最後一個構造
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException();
corePoolSize :最小值 0
maximunPoolSize :最小值 1
corePoolSize 必須小於或者等於 maximunPoolSize
主要來看 workQueue , 這個是就是線程隊列了。
下麵是AsyncTask並行線程池 THREAD_POOL_EXECUTOR 裡面所使用的線程隊列,128 代表線程隊列的長度
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);
下麵給出一個完整的例子:
//創建緩衝隊列 隊列長度:100 BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(100); //創建線程池 核心線程:5個 最大線程:10個 線程空閑存活時間:1秒 Executor executor = new ThreadPoolExecutor( 5 , 10 , 1 , TimeUnit.SECONDS , sPoolWorkQueue ) ; //添加任務到緩衝隊列 myTask1.executeOnExecutor( executor , "" ) ;
線程創建規則
一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。
3、 如果此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。也就是:處理任務的優先順序為:核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
線程池按以下行為執行任務
- 當線程數小於核心線程數時,創建線程。
- 當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
- 當線程數大於等於核心線程數,且任務隊列已滿
- 若線程數小於最大線程數,創建線程
- 若線程數等於最大線程數,拋出異常,拒絕任務
任務隊列執行的邏輯:
FIFO 先進先出
9、自定義線程池的簡便方法
在第8節我們詳解瞭如何自定義線程池,講解了 ThreadPoolExecutor 構造方法的每個參數的用法,但是如果自定義線程池都要寫那麼多參數,豈不是很麻煩。
幸運的是,系統的 java.util.concurrent 包下麵 Executors 類提供了很多簡單的方法,供我們使用,這對苦逼的碼農來說,是很好的福音。
Executors 裡面的方法有
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
猛一看,方法那麼多,其實把方法單獨領出來
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) public static ExecutorService newWorkStealingPool() public static ExecutorService newWorkStealingPool(int parallelism) public static ScheduledExecutorService newSingleThreadScheduledExecutor() public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) public static ExecutorService newCachedThreadPool() public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) public static ScheduledExecutorService newScheduledThreadPool ( int corePoolSize, ThreadFactory threadFactory)
這樣一歸類,就清晰很多了,就12個方法,然後每2個方法又可以歸為一組,也就是6組。
9.1 newFixedThreadPool(int nThreads) 創建固定大小的線程池
創建一個線程數量固定大小,任務隊列無限大的線程池。當隊列中需要的線程數超出定義的線程的時候,所有任務將在隊列中排隊,等待空閑線程執行。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
9.2 newSingleThreadExecutor 創建只有一個線程的線程池,其實就相當於串列線程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
9.3、newWorkStealingPool
總結:
1、本篇文章的demo例子已上傳至github: https://github.com/zyj1609wz/AsyncTaskDemo
2、本人微信公眾賬號:zhaoyanjun125 , 歡迎關註