調用同步鎖的wait()、notify()、notifyAll()進行線程通信 看這個經典的存取款問題,要求兩個線程存款,兩個線程取款,賬戶里有餘額的時候只能取款,沒餘額的時候只能存款,存取款金額相同。相當於存取款交替進行,金額相同。 線程間通信,需要通過同一個同步監視器(也就是this或者顯式的O ...
調用同步鎖的wait()、notify()、notifyAll()進行線程通信
- 看這個經典的存取款問題,要求兩個線程存款,兩個線程取款,賬戶里有餘額的時候只能取款,沒餘額的時候只能存款,存取款金額相同。相當於存取款交替進行,金額相同。
- 線程間通信,需要通過同一個同步監視器(也就是this或者顯式的Object對象)調用通信方法,
- Object有三個方法,可以用於線程間通信
- wait()
- 當前線程等待,並釋放同步鎖
- wait():無限期等待
- wait(long timeout):等待timeout毫秒,
- wait(long timeout,int nanos):等待timeout毫秒+nanos納秒,nanos的範圍[0,999999]
- notify()
- 喚醒該同步監視器上的任意一個線程
- 只有當前線程調用了wait()方法後,被notify()喚醒的線程才會喚醒
- notifyAll()
- 喚醒該同步監視器上的所有線程
- 只有當前線程調用了wait()方法後,被notify()喚醒的線程才會喚醒
- wait()
- 看示例代碼:
package testpack;
public class Test1 {
public static void main(String[] args){
Account ac=new Account("A123",0.0);
new Deposit("存款者A",ac,325.0).start(); //這裡開啟兩個存款線程
new Withdraw("取款者甲",ac,325.0).start(); //開啟兩個取款線程
new Deposit("存款者B",ac,325.0).start();
new Withdraw("取款者乙",ac,325.0).start();
}
}
class Withdraw extends Thread{ //取款任務
private Account account;
private double withdrawAmount;
public Withdraw (String threadName,Account account,double withdrawAmount){
super(threadName);
this.account=account;
this.withdrawAmount=withdrawAmount;
}
public void run(){
for (int i=1;i<=2;i++){ //每個線程迴圈取款2次
account.withdraw(withdrawAmount);
}
}
}
class Deposit extends Thread{ //存款任務
private Account account;
private double depositAmount;
public Deposit (String threadName,Account account,double depositAmount){
super(threadName);
this.account=account;
this.depositAmount=depositAmount;
}
public void run(){
for (int i=1;i<=2;i++){ //每個線程迴圈存款2次
account.deposit(depositAmount);
}
}
}
class Account {
private String accountNO;
private double balance; //賬戶餘額
private boolean flag=false; //用於判斷該賬戶是否可以進行存款或取款
public Account(){}
public Account(String no,double balance){
accountNO=no;
this.balance=balance;
}
public double getBalance(){
return balance;
}
public synchronized void withdraw(double amount){ //同步方法,取款
try {
while (!flag){ //標記㈠。特別註意,這裡用while進行迴圈判斷,而不是用if-else判斷
this.wait(); //flag為false,則不可取款,線程等待,並釋放同步鎖
}
System.out.println(Thread.currentThread().getName()+"取款:"+amount);
balance-=amount;
System.out.println("取款後,餘額為: "+balance);
flag=false; //取款完畢後,將flag切換為false,下一個線程如果是取款線程,則不能取款
System.out.println("---------------上面取款完畢-------------------");
this.notifyAll(); //標記㈢。取款完畢,喚醒其他所有線程
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
public synchronized void deposit(double amount){ //同步方法,存款
try{
while (flag){ //標記㈡。特別註意,這裡用while進行迴圈判斷,而不是用if-else判斷
this.wait(); //如果flag為true,則不能存款,線程等待並釋放同步鎖
}
System.out.println(Thread.currentThread().getName()+"存款"+amount);
balance+=amount;
System.out.println("存款後,賬戶餘額為: "+balance);
flag=true; //存款完畢後,將flag切換為true,下一個線程如果是存款線程,則不能存款
System.out.println("---------------上面存款完畢-------------------");
this.notifyAll(); //標記㈣存款完畢後,喚醒其他所有線程
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
}
輸出:
存款者A存款325.0
存款後,賬戶餘額為: 325.0
---------------上面存款完畢-------------------
取款者乙取款:325.0
取款後,餘額為: 0.0
---------------上面取款完畢-------------------
存款者B存款325.0
存款後,賬戶餘額為: 325.0
---------------上面存款完畢-------------------
取款者甲取款:325.0
取款後,餘額為: 0.0
---------------上面取款完畢-------------------
存款者B存款325.0
存款後,賬戶餘額為: 325.0
---------------上面存款完畢-------------------
取款者乙取款:325.0
取款後,餘額為: 0.0
---------------上面取款完畢-------------------
存款者A存款325.0
存款後,賬戶餘額為: 325.0
---------------上面存款完畢-------------------
取款者甲取款:325.0
取款後,餘額為: 0.0
---------------上面取款完畢-------------------
- 看上面的輸出:存款者A和B,取款者甲和乙分別各進行了2次存款或取款操作,並且交替執行
- 看上面的標記㈢和㈣
- 這裡只能使用notifyAll(),而不能使用notify()方法,因為可能導致程式阻塞,比如:
- 存款A線程第一次存款完畢,喚醒一個線程(當然第一次沒有線程可供喚醒)並再次執行,wait()。狀態:A阻塞+B甲乙就緒
- 存款B線程試圖存款,失敗,wait()。狀態:AB+甲乙
- 取款甲線程第一次取款完畢,喚醒存款A線程,並再次執行,wait()。狀態:B甲+A乙
- 取款乙線程試圖取款,失敗,wait()。狀態:B甲乙+A
- 存款A線程第二次存款完畢,喚醒存款B線程,並再次執行,wait()。狀態:甲乙A+B
- 存款B線程試圖存款,失敗,wait()。狀態:AB甲乙均處於wait()狀態
- 此時,四個線程都處於阻塞狀態
- 再看上面的標記㈠和㈡
- 上面這段代碼主要來源於《瘋狂Java講義 第三版》的“codes\16\16.6\synchronized”目錄
- 原代碼用的if-else對flag進行判斷,這裡存在問題,直接導致不論存款(或取款)成功或失敗(即wait),run()方法的迴圈計數器都會自增1,導致存款(或取款)次數比預計的少,進而導致存款(取款線程已執行完,而存款線程仍在執行)或取款(存款線程已執行完,而取款線程仍在執行)線程阻塞
- 應當採用while進行迴圈判斷,線程被喚醒之後,應再次進行判斷,而不是直接將迴圈計數器自增,可以保證在每個迴圈中都成功進行了一次存款
調用Condition對象的的await()、signal()、signalAll()方法實現線程間通信
- 上面Object的wait()、notify()、notifyAll()方法只能適用於this、顯式的Object對象
- 對於用Lock進行加鎖的同步方法,上面的三個方法則不適用,這時候得靠Condition對象的另外三個方法
- 通過Lock鎖的newCondition()方法返回一個Condition對象,然後調用該對象的下麵三個方法進行通信
- await()
- 類似於wait()方法
- await(long timeout,int nanos)
- awaitnanos(long nanosTimeout)
- awaitUninterruptibly()
- awaitUntil(Date deadline)
- signal()
- 類似於notify()
- signalAll()
- 類似於notifyAll()
- await()
- Lock鎖的newCondition()方法返回的是ConditionObject對象,這是AbstractQueuedSynchronizer抽象類的一個內部類,該內部類實現了Condition介面
- 下麵用Lock及這三個新方法改寫上面的Account類
class Account {
private String accountNO;
private double balance;
private boolean flag=false;
private final ReentrantLock lock=new ReentrantLock(); //創建一把Lock鎖
private final Condition cond=lock.newCondition(); //返回Condition對象
public Account(){}
public Account(String no,double balance){
accountNO=no;
this.balance=balance;
}
public double getBalance(){
return balance;
}
public void withdraw(double amount){
lock.lock(); //獲取鎖並加鎖
try {
while (!flag){
cond.await(); //調用Condition對象的await()方法
}
System.out.println(Thread.currentThread().getName()+"取款:"+amount);
balance-=amount;
System.out.println("取款後,餘額為: "+balance);
flag=false;
System.out.println("---------------上面取款完畢-------------------");
cond.signalAll();
}catch(InterruptedException ex){
ex.printStackTrace();
}finally{
lock.unlock(); //釋放鎖
}
}
public void deposit(double amount){
lock.lock();
try{
while (flag){
cond.await();
}
System.out.println(Thread.currentThread().getName()+"存款"+amount);
balance+=amount;
System.out.println("存款後,賬戶餘額為: "+balance);
flag=true;
System.out.println("---------------上面存款完畢-------------------");
cond.signalAll();
}catch(InterruptedException ex){
ex.printStackTrace();
}finally{
lock.unlock();
}
}
}
如果調用了Lock對象的wait()、notify()、notifyAll()方法會怎樣?
- Lock對象也是Object的子類的實例,也擁有這三個方法,按理說調用Lock對象這個同步監視器的該三個方法,也應該能達到通信的目的
- 改寫後,程式輸出如下:
存款者A存款325.0Exception in thread "存款者A" Exception in thread "取款者甲" //
存款後,賬戶餘額為: 325.0
---------------上面存款完畢-------------------
取款者甲取款:325.0
取款後,餘額為: 0.0
---------------上面取款完畢-------------------
存款者B存款325.0
存款後,賬戶餘額為: 325.0
---------------上面存款完畢-------------------
Exception in thread "存款者B" 取款者乙取款:325.0
取款後,餘額為: 0.0
java.lang.IllegalMonitorStateException
---------------上面取款完畢-------------------
at java.lang.Object.notifyAll(Native Method)
at testpack.Account.deposit(Test1.java:86)
at testpack.Deposit.run(Test1.java:39)
Exception in thread "取款者乙" java.lang.IllegalMonitorStateException
at java.lang.Object.notifyAll(Native Method)
at testpack.Account.withdraw(Test1.java:68)
at testpack.Withdraw.run(Test1.java:25)
java.lang.IllegalMonitorStateException
at java.lang.Object.notifyAll(Native Method)
at testpack.Account.withdraw(Test1.java:68)
at testpack.Withdraw.run(Test1.java:25)
java.lang.IllegalMonitorStateException
at java.lang.Object.notifyAll(Native Method)
at testpack.Account.deposit(Test1.java:86)
at testpack.Deposit.run(Test1.java:39)
- 上面出現了大量的“IllegalMonitorStateException”異常,暫時還分析不了出錯的原因
通過阻塞隊列實現線程間通信
- 上面的Account的取款、存款問題,抽象一下:一個Account,兩個任務(一個存款、一個取款),每個任務兩條線程(但兩條線程完成的並不是同一項任務)
BlockingQueue是一個阻塞隊列介面,它有很多實現類,見下圖:來源於《Java瘋狂講義 第三版》
- 實現類:
- ArrayBlockingQueue:基於數組實現
- LinkedBlockingQueue:基於鏈表實現
- PriorityBlockingQueue:內部元素按照排序器排序,並非先進先出
- SynchronousQueue:同步隊列,存取交替進行
- DelayQueue:內部元素實現Delay介面,內部元素按照getDelay()的返回值排序
- 該介面是Queue的子介面,但並不是作為容器使用,而是作為線程同步工具使用。
- 當一個線程要往裡面put()一個元素時,若隊列已滿,則線程阻塞
- 當一個線程從裡面take()一個元素時,若隊列為空,則線程阻塞
- 三類方法
- 在隊列尾部插入元素:若隊列已滿,分別會:
- add(E e):拋出異常
- offer(E e):返回false
- put(E e):阻塞隊列
- 在隊列頭部取出元素,並刪除元素:若隊列為空,分別會:
- remove():拋出異常
- poll():返回false
- take():阻塞隊列
- 在隊列頭部取出元素,但不刪除元素:若隊列為空,分別會:
- element():拋出異常
- peek():返回false
- 在隊列尾部插入元素:若隊列已滿,分別會:
見示例:
package testpack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test2 {
public static void main(String[] args){
BlockingQueue<String> bq=new ArrayBlockingQueue<>(1);
new Producer(bq,"生產者A").start();
new Producer(bq,"生產者B").start();
new Consumer(bq,"消費者X").start(); //兩個生產者,一個消費者,會產生阻塞
}
}
class Producer extends Thread{
private BlockingQueue<String> bq;
Producer(BlockingQueue bq,String name){
super(name);
this.bq=bq;
}
public void run(){ //run()方法沒有被同步,for迴圈中的代碼可能被分開執行
String[] str={"A","B","C"};
for (int i=0;i<3;i++){
System.out.println(getName()+" 準備向阻塞隊列中添加元素");
try{
bq.put(str[i%3]);
}catch(InterruptedException ex){
ex.printStackTrace();
}
System.out.println(getName()+"添加元素完成: "+bq);
}
}
}
class Consumer extends Thread{
private BlockingQueue<String> bq;
Consumer(BlockingQueue bq,String name){
super(name);
this.bq=bq;
}
public void run(){
for (int i=0;i<3;i++){
System.out.println(getName()+" 準備從阻塞隊列中取出元素");
try{
System.out.println(getName()+"取出元素成功: "+bq.take());
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
}
}
- 輸出結果如下:
生產者A 準備向阻塞隊列中添加元素 //線程A被中斷,可能在添加成功前或後
生產者B 準備向阻塞隊列中添加元素 //線程B可能被中斷,可能被阻塞
生產者A添加元素完成: [M] //線程A添加成功
生產者A 準備向阻塞隊列中添加元素 //線程A阻塞
消費者X 準備從阻塞隊列中取出元素
消費者X取出元素成功: M //線程X取出成功
消費者X 準備從阻塞隊列中取出元素 //線程X被阻塞
生產者B添加元素完成: [M] //線程B添加成功
生產者A添加元素完成: [N] //這裡之所以連續添加2次,因為X已將元素取出,但沒有輸出
消費者X取出元素成功: M //X將取出的元素輸出
生產者A 準備向阻塞隊列中添加元素 //線程A被阻塞或中斷
生產者B 準備向阻塞隊列中添加元素 //線程B被阻塞或中斷
消費者X 準備從阻塞隊列中取出元素
消費者X取出元素成功: N //X將取出的元素輸出
生產者A添加元素完成: [K] //三次消費已執行結束,生產者線程還在執行,程式阻塞
- ArrayBlockingQueue內部定義了一把private的ReentrantLock鎖,在創建對象時創建鎖對象(false策略)
- 在put()/take()阻塞的時候,會釋放ReentrantLock鎖對象
- 該示例存在的問題:生產和消費的run()方法沒有被同步,導致輸出的信息錯亂;如果在run()中設置同步代碼塊,用bq做鎖,則在生產方阻塞的時候導致死鎖,暫時還不會解決。