jdk1.7.0_79 本文實際上是對上文《13.ThreadPoolExecutor線程池之submit方法》的一個延續或者一個補充。在上文中提到的submit方法里出現了FutureTask,這不得不停止腳步將方向轉向Java的Future模式。 Future是併發編程中的一種設計模式,對於多線 ...
jdk1.7.0_79
本文實際上是對上文《13.ThreadPoolExecutor線程池之submit方法》的一個延續或者一個補充。在上文中提到的submit方法里出現了FutureTask,這不得不停止腳步將方向轉向Java的Future模式。
Future是併發編程中的一種設計模式,對於多線程來說,線程A需要等待線程B的結果,它沒必要一直等待B,可以先拿到一個未來的Future,等B有了結果後再取真實的結果。
ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(callable); //主線程需要callable線程的結果,先拿到一個未來的Future System.out.println(future.get()); //有了結果後再根據get方法取真實的結果,當然如果此時callable線程如果沒有執行完get方法會阻塞執行完,如果執行完則直接返回結果或拋出異常
也就是說,Future它代表一個非同步計算的結果。
上面就代表了Future模式的執行原理,根據網上的例子,我們可以來自己實現一個Future模式。
1 package com.future; 2 3 /** 4 * 數據結果 5 * Created by yulinfeng on 6/18/17. 6 */ 7 public interface Data { 8 String getResult() throws InterruptedException; 9 }
1 package com.future; 2 3 /** 4 * 結果的真實計算過程 5 * Created by yulinfeng on 6/18/17. 6 */ 7 public class RealData implements Data { 8 protected String data; 9 10 public RealData(String data) { 11 try { 12 System.out.println("正在計算結果"); 13 Thread.sleep(3000); //模擬計算 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 this.data = data + “ world”; 18 } 19 20 public String getResult() throws InterruptedException { 21 return data; 22 } 23 }
1 package com.future; 2 3 /** 4 * 真實結果RealData的代理 5 * Created by yulinfeng on 6/18/17. 6 */ 7 public class FutureData implements Data { 8 RealData realData = null; //對RealData的封裝,代理了RealData 9 boolean isReady = false; //真實結果是否已經準備好 10 11 public synchronized void setResultData(RealData realData) { 12 if (isReady) { 13 return; 14 } 15 this.realData = realData; 16 isReady = true; 17 notifyAll(); //realData已經被註入到了futureData中,通知getResult方法 18 } 19 20 public synchronized String getResult() throws InterruptedException { 21 if (!isReady) { 22 wait(); //數據還未計算好,阻塞等待 23 } 24 return realData.getResult(); 25 } 26 }
1 package com.future; 2 3 /** 4 * Client主要完成的功能包括:1. 返回一個FutureData;2.開啟一個線程用於構造RealData 5 * Created by yulinfeng on 6/18/17. 6 */ 7 public class Client { 8 9 public Data request(final String string) { 10 final FutureData futureData = new FutureData(); 11 12 /*計算過程比較慢,單獨放到一個線程中去*/ 13 new Thread(new Runnable() { 14 15 public void run() { 16 RealData realData = new RealData(string); 17 futureData.setResultData(realData); 18 } 19 }).start(); 20 21 return futureData; //先返回一個“假”的futureData 22 } 23 }
1 /** 2 * 負責調用Client發起請求,並使用返回的數據。 3 * Created by yulinfeng on 6/18/17. 4 */ 5 public class Main { 6 public static void main(String[] args) throws InterruptedException { 7 Client client = new Client(); 8 System.out.println("準備計算結果"); 9 Data data = client.request("hello"); //立即返回一個“假”的futureData,可以不用阻塞的等待數據返回,轉而執行其它任務 10 System.out.println("執行其它任務"); 11 Thread.sleep(3000); //模擬執行其它任務 12 System.out.println("數據的計算結果為:" + data.getResult()); 13 } 14 }
仔細閱讀以上程式對Future模式的實現不難發現,Future模式是非同步請求和代理模式的結合。當然在JDK中已經為我們實現好了Future模式。
修改RealData類:
1 package com.future; 2 3 import java.util.concurrent.Callable; 4 5 /** 6 * 結果的真實計算過程 7 * Created by yulinfeng on 6/18/17. 8 */ 9 public class RealData2 implements Callable<String> { 10 protected String data; 11 12 public RealData2(String data) { 13 this.data = data; 14 } 15 public String call() throws Exception { 16 try { 17 System.out.println("正在計算結果"); 18 Thread.sleep(2000); //模擬計算結果 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 this.data = data + " world"; 23 return data; 24 } 25 }
修改Main測試類:
1 package com.future; 2 3 import java.util.concurrent.ExecutionException; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.Future; 7 8 /** 9 * 負責調用Executor的submit,並使用返回的數據。 10 * Created by yulinfeng on 6/18/17. 11 */ 12 public class Main2 { 13 14 public static void main(String[] args) throws InterruptedException, ExecutionException { 15 ExecutorService client = Executors.newSingleThreadExecutor(); //類似Client 16 System.out.println("準備計算結果"); 17 Future<String> data = client.submit(new RealData2("hello")); //類似Client.request 18 System.out.println("執行其它任務"); 19 Thread.sleep(3000); 20 System.out.println("數據的計算結果為:" + data.get()); 21 } 22 }
現在回到上文還未解決完的AbstractExecutorService#submit方法。
類比上面的Client#request方法,在Client#request中先創建一個FutureData實例,而在AbstractExecutorService#submit中則是創建一個FutureTask實例,接著Client#request新創建一個線程用於非同步執行任務,並直接返回FutureData,而在AbstractExecutorService#submit中同樣也將任務交給了execute方法,並直接返回FutureTask。當然JDK中Future模式的實現更為複雜。
在《12.ThreadPoolExecutor線程池原理及其execute方法》中我們講解了execute方法,在ThreadPoolExecutor$Worker#runWorker方法第1145行中是對task任務的調用:
//ThreadPoolExecutor$Worker#runWorker task.run();
submit調用execute以執行run方法,實際執行的是FutureTask中的run方法。在FutureTask#run中,可以看到對任務Callable類型的task非同步的執行,以及結果的保存。