CompletableFuture非同步回調 CompletableFuture簡介 CompletableFuture被用於非同步編程,非同步通常意味著非阻塞,可以使得任務單獨允許在與主線程分離的其他線程中,並且通過回調可以在主線程中得到非同步任務的執行狀態,是否完成,和是否異常信息。 Completab ...
CompletableFuture非同步回調
CompletableFuture簡介
CompletableFuture被用於非同步編程,非同步通常意味著非阻塞,可以使得任務單獨允許在與主線程分離的其他線程中,並且通過回調可以在主線程中得到非同步任務的執行狀態,是否完成,和是否異常信息。
CompletableFuture實現了Future,CompletionStage介面,實現了Future介面可以相容線程池框架,而CompletionStage介面才是非同步編程的介面抽象,裡面定義多種非同步方法,通過這兩者集合,從而打造出了強大的CompletableFuture類。
Futrue和CompletableFuture
Future在Java裡面,通過用來表示一個非同步任務的引用,比如我們將任務提交到線程池裡面,然後我麽會得到一個Future,在Future裡面有isDone方法來判斷任務是否處理結束,該有get方法可以一直阻塞直到任務結束然後獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或者不斷輪詢才能知道任務是否完成。
Futrue缺點
1.不支持手動完成。2.不支持進一步的非阻塞調用。3.不支持鏈式調用。4.不支持多個Future合併。5.不支持非同步處理。
CompletableFuture類的使用案例
CompletableFuture01
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* 演示CompletableFuture
* @author 長名06
* @version 1.0
*/
public class CompletableFuture01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
System.out.println("子線程開始幹活");
try {
//子線程沉睡3s
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//完成future任務
future.complete("success");
},"A").start();
System.out.println("主線程調用get方法獲取結果為:" + future.get());
System.out.println("主線程完成,阻塞結束");
}
}
CompletableFuture02
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author 長名06
* @version 1.0
*/
public class CompletableFuture02 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//非同步調用,無返回值
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "執行runSync()");
});
completableFuture1.get();
//非同步調用,有返回值
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "執行supplyAsync()");
// int i = 1/0;
return 1024;
});
completableFuture2.whenComplete((t, u) -> {
System.out.println("----t=" + t);//t參數,是執行的返回值
System.out.println("----u=" + u);//異常信息
}).get();
// System.out.println(Runtime.getRuntime().availableProcessors());
}
}
CompletableFuture03
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* 演示線程依賴,執行api thenApply()
* 一個任務,依賴於另一個任務可以使用thenApply()將兩個任務(線程)串列化
* 對一個數先加10 再平方
* @author 長名06
* @version 1.0
*/
public class CompletableFuture03 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主線程開始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任務開啟");
num += 10;
return num;
}).thenApply(i -> num * num);
Integer integer = future.get();
System.out.println("主線程結束,子線程的結果為" + integer);
}
}
CompletableFuture04
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* 消費處理結果
* thenAccept()方法,接收任務的處理結果,並消費結果,不返回結果了
* @author 長名06
* @version 1.0
*/
public class CompletableFuture04 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主線程開始");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任務開啟");
num += 10;
return num;
}).thenApply(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return num * num;
}
}).thenAccept(new Consumer<Integer>() {
@Override
public void accept(Integer i) {
System.out.println("子線程全部處理完成,最後調用了accept方法,消費了結果" + i);
}
});
}
}
CompletableFuture05
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* 異常處理
* @author 長名06
* @version 1.0
*/
public class CompletableFuture05 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主線程開始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;//模擬異常
System.out.println("加10任務開啟");
num += 10;
return num;
}).exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable ex) {
System.out.println(ex.getMessage());
return -1;
}
});
}
}
CompletableFuture06
package com.shaonian.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* 消費結果,同時處理異常
* handle類似與thenAccept/thenRun方法,是最後一步結果的調用,但是同時可以處理異常
* @author 長名06
* @version 1.0
*/
public class CompletableFuture06 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + "主線程開始");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1/0;
System.out.println("加10任務開啟");
num += 10;
return num;
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer i, Throwable ex) {
System.out.println("進入了handle方法");
if(ex != null){
System.out.println("發生了異常,內容為" + ex.getMessage());
return -1;
}else{
System.out.println("正常執行,結果為" + i);
return i;
}
}
});
}
}
CompletableFuture07
package com.shaonian.juc.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* 兩個CompletableFuture結果的合併
* @author 長名06
* @version 1.0
*/
public class CompletableFuture07 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
//有依賴關係的合併
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任務開啟");
num += 10;
return num;
});
//合併
CompletableFuture<Integer> future2 = future.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(Integer i) {
return CompletableFuture.supplyAsync(() -> {
return i + 1;
});
}
});
System.out.println(future.get());
System.out.println(future2.get());
//無依賴的任務合併
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任務開啟");
num += 10;
return num;
});
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘10任務開啟");
num *= 10;
return num;
});
//合併兩個結果
CompletableFuture<Object> future3 = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
@Override
public List<Integer> apply(Integer result1, Integer result2) {
ArrayList<Integer> list = new ArrayList<>();
list.add(result1);
list.add(result2);
return list;
}
});
System.out.println("合併結果為" + future3.get());
}
}
CompletableFuture08
package com.shaonian.juc.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* 多個獨立任務的合併 allOf
* @author 長名06
* @version 1.0
*/
public class CompletableFuture08 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<Integer>> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
System.out.println("加10任務開啟");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
System.out.println("乘10任務開啟");
num *= 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
System.out.println("減10任務開啟");
num -= 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
System.out.println("除10任務開啟");
num /= 10;
return num;
});
list.add(job4);
//使用allOf需註意,輸入也會執行任務,但是無法獲取到結果
//allOf需要等所有的任務執行完畢
/**
* 返回值是CompletableFuture<Void>類型
* public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
* return andTree(cfs, 0, cfs.length - 1);
* }
*/
// CompletableFuture<Void> allJob = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
// System.out.println(allJob.get());
//也可以使用 join的形式,執行,可以獲取結果
List<Integer> allResult = list.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(allResult);
}
}
CompletableFuture09
package com.shaonian.juc.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* anyOf
* @author 長名06
* @version 1.0
*/
public class CompletableFuture09 {
public static Integer num = 10;
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<CompletableFuture<Integer>> list = new ArrayList<>();
CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("加10任務開啟");
num += 10;
return num;
});
list.add(job1);
CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("乘10任務開啟");
num *= 10;
return num;
});
list.add(job2);
CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("減10任務開啟");
num -= 10;
return num;
});
list.add(job3);
CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("除10任務開啟");
num /= 10;
return num;
});
list.add(job4);
//anyOf,這裡只要有一個job執行完畢,就結束所有的任務執行,不需要等待所有的job執行完畢
//但是這個很雞肋,因為如果不要執行所有的任務,就沒必要開啟一個CompletableFuture了
//也可以適用於競爭的場景,先執行成功的獲取結果,其他的不再競爭了
CompletableFuture<Object> allJob = CompletableFuture.anyOf(list.toArray(new CompletableFuture[0]));
System.out.println(allJob.get());
}
}
只是為了記錄自己的學習歷程,且本人水平有限,不對之處,請指正。