一、線程池 Sun在Java5中,對Java線程的類庫做了大量的擴展,其中線程池就是Java5的新特征之一,除了線程池之外,還有很多多線程相關的內容,為多線程的編程帶來了極大便利。為了編寫高效穩定可靠的多線程程式,線程部分的新增內容顯得尤為重要。 有關Java5線程新特征的內容全部在java.uti ...
一、線程池
Sun在Java5中,對Java線程的類庫做了大量的擴展,其中線程池就是Java5的新特征之一,除了線程池之外,還有很多多線程相關的內容,為多線程的編程帶來了極大便利。為了編寫高效穩定可靠的多線程程式,線程部分的新增內容顯得尤為重要。 有關Java5線程新特征的內容全部在java.util.concurrent下麵,裡面包含數目眾多的介面和類,熟悉這部分API特征是一項艱難的學習過程。當然新特征對做多線程程式沒有必須的關係,在java5之前通用可以寫出很優秀的多線程程式。只是代價不一樣而已。 線程池的基本思想還是一種對象池的思想,開闢一塊記憶體空間,裡面存放了眾多(未死亡)的線程,池中線程執行調度由池管理器來處理。當有線程任務時,從池中取一個,執行完成後線程對象歸池,這樣可以避免反覆創建線程對象所帶來的性能開銷,節省了系統的資源。 在Java5之前,要實現一個線程池是相當有難度的,現在Java5為我們做好了一切,我們只需要按照提供的API來使用,即可享受線程池帶來的極大便利。 Java5的線程池分好多種:固定尺寸的線程池、可變尺寸連接池。 在使用線程池之前,必須知道如何去創建一個線程池,在Java5中,需要瞭解的是java.util.concurrent.Executors類的API,這個類提供大量創建連接池的靜態方法,是必須掌握的。a、固定大小的線程池
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
/**
* Java線程:線程池-
*
* */
public class Test {
public static void main(String[] args) {
//創建一個可重用固定線程數的線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//創建實現了Runnable介面對象,Thread對象當然也實現了Runnable介面
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉線程池
pool.shutdown();
}
}
class MyThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在執行...");
}
}
運行結果:
pool-1-thread-1正在執行...
pool-1-thread-1正在執行...
pool-1-thread-2正在執行...
pool-1-thread-1正在執行...
pool-1-thread-2正在執行...
如果將線程池的大小改為4,則運行結果如下:
pool-1-thread-2正在執行...
pool-1-thread-3正在執行...
pool-1-thread-3正在執行...
pool-1-thread-2正在執行...
pool-1-thread-1正在執行...
b、單任務線程池
在上例的基礎上改一行創建pool對象的代碼為:
//創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
ExecutorService pool = Executors.newSingleThreadExecutor();
則,運行結果為:
pool-1-thread-1正在執行...
pool-1-thread-1正在執行...
pool-1-thread-1正在執行...
pool-1-thread-1正在執行...
pool-1-thread-1正在執行...
對於以上兩種連接池,大小都是固定的,當要加入的池的線程(或者任務)超過池最大尺寸時候,則入此線程池需要排隊等待。
一旦池中有線程完畢,則排隊等待的某個線程會入池執行。
c、可變尺寸的線程池
與上面的類似,只是改動下pool的創建方式:
//創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。
ExecutorService pool = Executors.newCachedThreadPool();
運行結果如下:
pool-1-thread-1正在執行...
pool-1-thread-5正在執行...
pool-1-thread-4正在執行...
pool-1-thread-3正在執行...
pool-1-thread-2正在執行...
d、延遲連接池
package concurrent;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Java線程:線程池-
*
*
*/
public class Test {
public static void main(String[] args) {
//創建一個線程池,它可那排在給定延遲後運行命令或者定期地執行
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
//創建實現了Runnable介面對象,Thread對象當然也實現了Runnable介面
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
//使用延遲執行風格的方法
pool.schedule(t4, 5, TimeUnit.SECONDS);
pool.schedule(t5, 10, TimeUnit.SECONDS);
//關閉線程池
pool.shutdown();
}
}
class MyThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"正在執行...");
}
}
e、單任務連接線程池
在d的代碼基礎上,做改動
//創建一個單線程執行程式,它可安排在給定延遲後運行命令或者定期地執行。
ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
運行時,會發現,t4延遲5s後得到執行,t5延遲10s後得到執行。運行結果如下:
pool-1-thread-2正在執行...
pool-1-thread-1正在執行...
pool-1-thread-1正在執行...
pool-1-thread-2正在執行...
pool-1-thread-1正在執行...
f、自定義線程池
package concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Java線程:線程池-自定義線程池
*
*
*/
public class Test {
public static void main(String[] args) {
//創建等待隊列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//創建一個單線程執行程式,它可安排在給定延遲後運行命令或者定期地執行。
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,2,TimeUnit.MILLISECONDS,bqueue);
//創建實現了Runnable介面對象,Thread對象當然也實現了Runnable介面
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
Thread t6 = new MyThread();
Thread t7 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//關閉線程池
pool.shutdown();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在執行...");
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
運行結構如下:
創建自定義線程池的構造方法很多,本例中參數的含義如下:
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
- 用給定的初始參數和預設的線程工廠及處理程式創建新的 ThreadPoolExecutor。使用
Executors
工廠方法之一比使用此通用構造方法方便得多。- 參數:
corePoolSize
- 池中所保存的線程數,包括空閑線程。maximumPoolSize
- 池中允許的最大線程數。keepAliveTime
- 當線程數大於核心時,此為終止前多餘的空閑線程等待新任務的最長時間。unit
- keepAliveTime 參數的時間單位。workQueue
- 執行前用於保持任務的隊列。此隊列僅保持由 execute 方法提交的 Runnable 任務。- 拋出:
IllegalArgumentException
- 如果 corePoolSize 或 keepAliveTime 小於零,或者 maximumPoolSize 小於或等於零,或者 corePoolSize 大於 maximumPoolSize。NullPointerException
- 如果 workQueue 為 null
二、有返回值的線程
在Java5之前,線程是沒有返回值的,常常為了“有”返回值,破費周折,而且代碼很不好寫。或者乾脆繞過這道坎,走別的路了。現在Java終於有可返回值的線程了。 可返回值的任務必須實現Callable介面,類似的,無返回值的任務必須實現Runnable介面。 執行Callable任務後,可以獲取一個Future的對象,在該對象上調用get就可以獲取到Callable任務返回的Object了。下麵是一個簡單的例子:
package MultiThread;
import java.util.concurrent.*;
/**
* Java線程:有返回值的線程
*
*
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創建一個線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//創建兩個有返回值的任務
Callable<String> c1 = new MyCallable("A");
Callable<String> c2 = new MyCallable("B");
//執行任務並獲取Future對象
Future<String> f1 = pool.submit(c1);
Future<String> f2 = pool.submit(c2);
//從Future對象上獲取任務的返回值,並輸出到控制台
System.out.println(">>>"+f1.get().toString());
System.out.println(">>>"+f2.get().toString());
//關閉線程池
pool.shutdown();
}
}
class MyCallable implements Callable<String>{
private String oid;
MyCallable(String oid) {
this.oid = oid;
}
@Override
public String call() throws Exception {
return oid+"任務返回的內容";
}
}
運行結果:
>>>A任務返回的內容
>>>B任務返回的內容
比較簡單,要深入瞭解還需要看Callable和Future介面的API啊。
三、併發庫的鎖
在Java5中,專門提供了鎖對象,利用鎖可以方便的實現資源的封鎖,用來控制對競爭資源併發訪問的控制,這些內容主要集中在java.util.concurrent.locks 包下麵,裡面有三個重要的介面Condition、Lock、ReadWriteLock。
介面摘要 | |
---|---|
Condition | Condition 將 Object 監視器方法(wait 、notify 和 notifyAll )分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set(wait-set)。 |
Lock | Lock 實現提供了比使用 synchronized 方法和語句可獲得的更廣泛的鎖定操作。 |
ReadWriteLock | ReadWriteLock 維護了一對相關的鎖 ,一個用於只讀操作,另一個用於寫入操作。 |
有關鎖的介紹,API文檔解說很多,看得很煩,還是看個例子再看文檔比較容易理解
a、普通鎖
package MultiThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Java線程:鎖
*
*
*/
public class Test {
public static void main(String[] args) {
//創建併發訪問的賬戶
MyCount myCount = new MyCount("95599200901215522", 10000);
//創建一個鎖對象
Lock lock = new ReentrantLock();
//創建一個線程池
ExecutorService pool = Executors.newCachedThreadPool();
//創建一些併發訪問用戶,一個信用卡,存的存,取的取,好熱鬧啊
UserThread ut1 = new UserThread("取款線程1", myCount, -4000, lock);
UserThread ut2 = new UserThread("存款線程1", myCount, 6000, lock);
UserThread ut3 = new UserThread("取款線程2", myCount, -8000, lock);
UserThread ut4 = new UserThread("存款線程2", myCount, 800, lock);
//線上程池中執行各個用戶的操作
pool.execute(ut1);
pool.execute(ut2);
pool.execute(ut3);
pool.execute(ut4);
//關閉線程池
pool.shutdown();
}
}
/**
* 信用卡的用戶 線程
* 多個用戶線程操作該信用卡
*/
class UserThread implements Runnable {
private String threadName; //用戶線程
private MyCount myCount; //所要操作的賬戶
private int iocash; //操作的金額,當然有正負之分了
private Lock myLock; //執行操作所需的鎖對象
UserThread(String name, MyCount myCount, int iocash, Lock myLock) {
this.threadName = name;
this.myCount = myCount;
this.iocash = iocash;
this.myLock = myLock;
}
public void run() {
//獲取鎖
myLock.lock();
//執行現金業務
System.out.println(threadName + "正在操作" + myCount + "賬戶,操作金額為" + iocash + ",當前金額為" + myCount.getCash());
myCount.setCash(myCount.getCash() + iocash);
System.out.println("\t操作成功,操作金額為" + iocash + ",當前金額為" + myCount.getCash());
//釋放鎖,否則別的線程沒有機會執行了
myLock.unlock();
}
}
/**
* 信用卡賬戶,可隨意透支
*/
class MyCount {
private String oid; //賬號
private int cash; //賬戶餘額
MyCount(String oid, int cash) {
this.oid = oid;
this.cash = cash;
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public int getCash() {
return cash;
}
public void setCash(int cash) {
this.cash = cash;
}
@Override
public String toString() {
return "MyCount{" +
"oid='" + oid + '\'' +
", cash=" + cash +
'}';
}
}
運行結果:
取款線程1正在操作MyCount{oid='95599200901215522', cash=10000}賬戶,操作金額為-4000,當前金額為10000
操作成功,操作金額為-4000,當前金額為6000
存款線程1正在操作MyCount{oid='95599200901215522', cash=6000}賬戶,操作金額為6000,當前金額為6000
操作成功,操作金額為6000,當前金額為12000
存款線程2正在操作MyCount{oid='95599200901215522', cash=12000}賬戶,操作金額為800,當前金額為12000
操作成功,操作金額為800,當前金額為12800
取款線程2正在操作MyCount{oid='95599200901215522', cash=12800}賬戶,操作金額為-8000,當前金額為12800
操作成功,操作金額為-8000,當前金額為4800
從上面的輸出可以看到,利用鎖對象太方便了,比直接在某個不知情的對象上用鎖清晰多了。但一定要註意的是,在獲取了鎖對象後,用完後應該儘快釋放鎖,以便別的等待該鎖的線程有機會去執行。
b、讀寫鎖
在a中提到了Lock介面以及對象,使用它可以很優雅的控制了競爭資源的安全訪問,但是這種鎖不區分讀寫,稱這種鎖為普通鎖。為了提高性能,Java提供了讀寫鎖,在讀的地方使用讀鎖,在寫的地方使用寫鎖,靈活控制,在一定程度上提高了程式的執行效率。 Java中讀寫鎖有個介面java.util.concurrent.locks.ReadWriteLock,也有具體的實現ReentrantReadWriteLock,詳細的API可以查看JavaAPI文檔。 下麵這個例子是在文例子的基礎上,將普通鎖改為讀寫鎖,並添加賬戶餘額查詢的功能,代碼如下:package MultiThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Java線程:鎖
*
*
*/
public class Test {
public static void main(String[] args) {
//創建併發訪問的賬戶
MyCount myCount = new MyCount("95599200901215522", 10000);
//創建一個鎖對象
ReadWriteLock lock = new ReentrantReadWriteLock();
//創建一個線程池
ExecutorService pool = Executors.newCachedThreadPool();
//創建一些併發訪問用戶線程,一個信用卡,存的存,取的取,好熱鬧啊
UserThread ut1 = new UserThread("取款線程1", myCount, -4000, lock,false);
UserThread ut2 = new UserThread("存款線程1", myCount, 6000, lock,false);
UserThread ut3 = new UserThread("取款線程2", myCount, -8000, lock,false);
UserThread ut4 = new UserThread("存款線程2", myCount, 800, lock,false);
UserThread ut5 = new UserThread("查詢", myCount, 0, lock,true);
//線上程池中執行各個用戶的操作
pool.execute(ut1);
pool.execute(ut2);
pool.execute(ut3);
pool.execute(ut4);
pool.execute(ut5);
//關閉線程池
pool.shutdown();
}
}
/**
* 信用卡的用戶 線程
* 多個用戶線程操作該信用卡
*/
class UserThread implements Runnable {
private String threadName; //用戶線程
private MyCount myCount; //所要操作的賬戶
private int iocash; //操作的金額,當然有正負之分了
private ReadWriteLock myLock; //執行操作所需的鎖對象
private boolean ischeck; //是否查詢
UserThread(String name, MyCount myCount, int iocash, ReadWriteLock myLock,boolean ischeck) {
this.threadName = name;
this.myCount = myCount;
this.iocash = iocash;
this.myLock = myLock;
this.ischeck=ischeck;
}
public void run() {
if(ischeck){
//獲取讀鎖
myLock.readLock().lock();
//執行查詢
System.out.println("讀:"+threadName + "正在查詢" + myCount + "賬戶,,當前金額為" + myCount.getCash());
//釋放獲取到的讀鎖
myLock.readLock().unlock();
}else{
//獲取寫鎖
myLock.writeLock().lock();
myCount.setCash(myCount.getCash() + iocash);
System.out.println("寫:"+threadName+"操作成功,操作金額為" + iocash + ",當前金額為" + myCount.getCash());
//釋放鎖獲取到的寫鎖
myLock.writeLock().unlock();
}
}
}
/**
* 信用卡賬戶,可隨意透支
*/
class MyCount {
private String oid; //賬號
private int cash; //賬戶餘額
MyCount(String oid, int cash) {
this.oid = oid;
this.cash = cash;
}
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public int getCash() {
return cash;
}
public void setCash(int cash) {
this.cash = cash;
}
@Override
public String toString() {
return "MyCount{" +
"oid='" + oid + '\'' +
", cash=" + cash +
'}';
}
}
運行結果:
寫:取款線程1操作成功,操作金額為-4000,當前金額為6000
寫:取款線程2操作成功,操作金額為-8000,當前金額為-2000
寫:存款線程1操作成功,操作金額為6000,當前金額為4000
讀:查詢正在查詢MyCount{oid='95599200901215522', cash=4000}賬戶,,當前金額為4000
寫:存款線程2操作成功,操作金額為800,當前金額為4800
在實際開發中,最好在能用讀寫鎖的情況下使用讀寫鎖,而不要用普通鎖,以求更好的性能。
四、信號量
Java的信號量實際上是一個功能完畢的計數器,對控制一定資源的消費與回收有著很重要的意義,信號量常常用於多線程的代碼中,並能監控有多少數目的線程等待獲取資源,並且通過信號量可以得知可用資源的數目等等,這裡總是在強調“數目”二字,但不能指出來有哪些在等待,哪些資源可用。 因此,本人認為,這個信號量類如果能返回數目,還能知道哪些對象在等待,哪些資源可使用,就非常完美了,僅僅拿到這些概括性的數字,對精確控制意義不是很大。目前還沒想到更好的用法。下麵是一個簡單的例子:
package MultiThread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Java線程:信號量
*
*
*/
public class Test {
public static void main(String[] args) {
MyPool myPool = new MyPool(20);
//創建線程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
MyThread t1 = new MyThread("任務A", myPool, 3);
MyThread t2 = new MyThread("任務B", myPool, 12);
MyThread t3 = new MyThread("任務C", myPool, 7);
//線上程池中執行任務
threadPool.execute(t1);
threadPool.execute(t2);
threadPool.execute(t3);
//關閉池
threadPool.shutdown();
}
}
/**
* 一個池
*/
class MyPool {
private Semaphore sp; //池相關的信號量
/**
* 池的大小,這個大小會傳遞給信號量
*
* @param size 池的大小
*/
MyPool(int size) {
this.sp = new Semaphore(size);
}
public Semaphore getSp() {
return sp;
}
public void setSp(Semaphore sp) {
this.sp = sp;
}
}
class MyThread extends Thread {
private String threadName; //線程的名稱
private MyPool pool; //自定義池
private int x; //申請信號量的大小
MyThread(String threadName, MyPool pool, int x) {
this.threadName = threadName;
this.pool = pool;
this.x = x;
}
public void run() {
try {
//從此信號量獲取給定數目的許可
pool.getSp().acquire(x);
//todo:也許這裡可以做更複雜的業務
System.out.println(threadName + "成功獲取了" + x + "個許可!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放給定數目的許可,將其返回到信號量。
pool.getSp().release(x);
System.out.println(threadName + "釋放了" + x + "個許可!");
}
}
}
運行結果:
任務A成功獲取了3個許可!
任務B成功獲取了12個許可!
任務B釋放了12個許可!
任務C成功獲取了7個許可!
任務A釋放了3個許可!
任務C釋放了7個許可!
從結果可以看出,信號量僅僅是對池資源進行監控,但不保證線程的安全,因此,在使用時候,應該自己控制線程的安全訪問池資源。
五、阻塞隊列
阻塞隊列是Java5線程新特征中的內容,Java定義了阻塞隊列的介面java.util.concurrent.BlockingQueue,阻塞隊列的概念是,一個指定長度的隊列,如果隊列滿了,添加新元素的操作會被阻塞等待,直到有空位為止。同樣,當隊列為空時候,請求隊列元素的操作同樣會阻塞等待,直到有可用元素為止。 有了這樣的功能,就為多線程的排隊等候的模型實現開闢了便捷通道,非常有用。 java.util.concurrent.BlockingQueue繼承了java.util.Queue介面,可以參看API文檔。 下麵給出一個簡單應用的例子:package MultiThread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Java線程:併發庫-阻塞隊列
*
*
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> bqueue = new ArrayBlockingQueue<Integer>(20);
for (int i = 0; i < 30; i++) {
//將指定元素添加到此隊列中,如果沒有可用空間,將一直等待(如果有必要)。
bqueue.put(i);
System.out.println("向阻塞隊列中添加了元素:" + i);
}
System.out.println("程式到此運行結束,即將退出----");
}
}
運行結果:
六、阻塞棧
對於阻塞棧,與阻塞隊列相似。不同點在於棧是“後入先出”的結構,每次操作的是棧頂,而隊列是“先進先出”的結構,每次操作的是隊列頭。 這裡要特別說明一點的是,阻塞棧是Java6的新特征。、 Java為阻塞棧定義了介面:java.util.concurrent.BlockingDeque,其實現類也比較多,具體可以查看JavaAPI文檔。 下麵看一個簡單例子:package MultiThread;
import java.util.concurrent.BlockingDeque;