Java的CompletableFuture,Java的多線程開發

来源:https://www.cnblogs.com/kakarotto-chen/archive/2023/05/25/17430989.html
-Advertisement-
Play Games

# 三、Java8的CompletableFuture,Java的多線程開發 ## 1、CompletableFuture的常用方法 - 以後用到再加 ```properties runAsync() :開啟非同步(創建線程執行任務),無返回值 supplyAsync() :開啟非同步(創建線程執行任務 ...


三、Java8的CompletableFuture,Java的多線程開發

1、CompletableFuture的常用方法

  • 以後用到再加
runAsync() :開啟非同步(創建線程執行任務),無返回值
supplyAsync() :開啟非同步(創建線程執行任務),有返回值
thenApply() :然後應用,適用於有返回值的結果,拿著返回值再去處理。
exceptionally():用於處理非同步任務執行過程中出現異常的情況的一個方法:返回預設值或者一個替代的 CompletableFuture 對象,從而避免系統的崩潰或異常處理的問題。
handle():類似exceptionally()


get()  :阻塞線程:主要可以: ①獲取線程中的異常然後處理異常、②設置等待時間
join() :阻塞線程:推薦使用  join()  方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制類型轉換。他自己會拋出異常。


CompletableFuture.allOf()
CompletableFuture.anyOf()
  • get() 和 join() 方法區別?
    • 都可以阻塞線程 —— 等所有任務都執行完了再執行後續代碼。
CompletableFuture 中的  get()  和  join()  方法都用於獲取非同步任務的執行結果,但是在使用時需要註意以下幾點區別: 
 
1. 拋出異常的方式不同:如果非同步任務執行過程中出現異常, get()  方法會拋出 ExecutionException 異常,而  join()  方法會拋出 CompletionException 異常,這兩個異常都是繼承自 RuntimeException 的。 
 
2. 方法調用限制不同: join()  方法是不可以被中斷的,一旦調用就必須等待任務執行完成才能返回結果;而  get()  方法可以在調用時設置等待的超時時間,如果超時還沒有獲取到結果,就會拋出 TimeoutException 異常。 
 
3. 返回結果類型不同: get()  方法返回的是非同步任務的執行結果,該結果是泛型類型 T 的,需要強制轉換才能獲取真正的結果;而  join()  方法返回的是非同步任務的執行結果,該結果是泛型類型 T,不需要強制轉換。 
 
4. 推薦使用方式不同:推薦在 CompletableFuture 中使用  join()  方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制類型轉換。 
 
綜上所述, get()  方法和  join()  方法都是獲取非同步任務的執行結果,但是在使用時需要根據具體場景選擇使用哪個方法。如果需要獲取執行結果並且不希望被中斷,推薦使用  join()  方法;如果需要控制等待時間或者需要捕獲異常,則可以使用  get()  方法。
  • anyOf() 和 allOf() 的區別?
CompletableFuture 是 Java 8 引入的一個強大的非同步編程工具,它支持鏈式調用、組合和轉換非同步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的兩個常用方法,它們的區別如下: 
 
1. anyOf:任意一個 CompletableFuture 完成,它就會跟隨這個 CompletableFuture 的結果完成,返回第一個完成的 CompletableFuture 的結果。 
 
2. allOf:所有的 CompletableFuture 都完成時,它才會跟隨它們的結果完成,返回一個空的 CompletableFuture。 
 
簡而言之,anyOf 和 allOf 的最大區別是:anyOf 任意一個 CompletableFuture 完成就跟著它的結果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,並返回一個空的 CompletableFuture。 
 
舉例來說,如果有三個 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能會返回一個字元串,而 f3 可能會返回一個整數,那麼: 
 
- anyOf(f1, f2, f3) 的結果是 f1、f2、f3 中任意一個 CompletableFuture 的結果; 
- allOf(f1, f2, f3) 的結果是一個空的 CompletableFuture,它的完成狀態表示 f1、f2、f3 是否全部完成。 
 
總之,anyOf 和 allOf 在實際使用中可以根據不同的需求來選擇,它們都是 CompletableFuture 中非常強大的組合操作。

2、使用CompletableFuture

2.1、實體類準備

package com.cc.md.entity;

import lombok.Data;

/**
 * @author CC
 * @since 2023/5/24 0024
 */
@Data
public class UserCs {

    private String name;

    private Integer age;

}

2.2、常用方式

  • 無返回值推薦:開啟多線程——無返回值的——阻塞:test06
    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    
    //CompletableFuture開啟多線程——無返回值的
    @Test
    public void test06() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //迴圈,模仿很多任務
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //第一批創建的線程數
                log.info("列印:{}", finalI);
                //模仿io流耗時
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞:多線程的任務執行。相當於多線程執行完了,再執行後面的代碼
        //如果不阻塞,上面的相當於非同步執行了。
        //阻塞方式1:可以獲取返回的異常、設置等待時間
//        futures.forEach(future -> {
//            try {
//                future.get();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
        //阻塞方式2(推薦)
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        log.info("列印:都執行完了。。。");
    }
  • 有返回值推薦:開啟多線程——有返回值的,返回一個新的List——阻塞——使用stream流的map:test09
    • test07、test08 可以轉化為 test09 (現在這個)
    • 可以返回任務類型的值,不一定要返回下麵的user對象。
    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    
    //CompletableFuture開啟多線程——有返回值的,返回一個新的List——先有數據的情況——使用stream流的map
    //像這種,需要構建另一個數組的,相當於一個線程執行完了,會有返回值
    //使用stream流的map + CompletableFuture.supplyAsync()
    @Test
    public void test09() throws Exception {
        //先獲取數據,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                    // 處理數據
                    user.setName(user.getName() + "-改");
                    log.info("列印-改:{}", user.getName());
                    // 其他的業務邏輯。。。

                    return user;
                }, myIoThreadPool)).collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有線程
                .map(CompletableFuture::join)
                //取age大於10的用戶
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

2.3、異常處理

  • exceptionally
  • handle
	//CompletableFuture 異常處理
    @Test
    public void test10() throws Exception {
        //先獲取數據,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                        if (user.getAge() > 5){
                            int a = 1/0;
                        }
                        // 處理數據
                        user.setName(user.getName() + "-改");
                        log.info("列印-改:{}", user.getName());
                        // 其他的業務邏輯。。。

                        return user;
                    }, myIoThreadPool)
                    //處理異常方式1:返回預設值或者一個替代的 Future 對象,從而避免系統的崩潰或異常處理的問題。
                    .exceptionally(throwable -> {
                        //可以直接獲取user
                        System.out.println("異常了:" + user);
                        //處理異常的方法……
                        //1還可以進行業務處理……比如將異常數據存起來,然後導出……
                        //2返回預設值,如:user、null
                        //return user;
                        //3拋出異常
                        throw new RuntimeException(throwable.getMessage());
                    })
                    //處理異常方式2:類似exceptionally(不推薦)
//                    .handle((userCs, throwable) -> {
//                        System.out.println("handle:" + user);
//                        if (throwable != null) {
//                            // 處理異常
//                            log.error("處理用戶信息出現異常,用戶名為:" + user.getName(), throwable);
//                            // 返回原始數據
//                            return userCs;
//                        } else {
//                            // 返回正常數據
//                            return userCs;
//                        }
//                    })
                )
                .collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有線程
                .map(CompletableFuture::join)
                //取age大於10的用戶
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

2.4、CompletableFuture的使用測試

1、推薦使用:test03、test05、test09、test10、test11

2、test07、test08就是test09的前身。


  • test01:獲取當前電腦(伺服器)的cpu核數

  • test02:線程池原始的使用(不推薦直接這樣用)

  • test03:開啟非同步1 —— @Async

  • test04:開啟非同步2 —— CompletableFuture.runAsync()

  • test05:開啟非同步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有非同步方法,一起提交

    • 相當於開了3個線程去執行三個不同的方法,然後執行完後一起提交。
      
  • test052:開啟非同步2的改造 —— 第一個任務執行完了,獲取到返回值,給後面的執行,可以連寫,也可以單寫。 —— 阻塞線程:get、join

  • test06:CompletableFuture開啟多線程——無返回值的

  • test07:CompletableFuture開啟多線程——無返回值的——構建一個新List

    • 1、相當於多線程執行任務,然後把結果插入到List中
      2、接收多線程的List必須是線程安全的,ArrayList線程不安全
         線程安全的List —— CopyOnWriteArrayList 替代 ArrayList
      
  • test08:CompletableFuture開啟多線程——無返回值的——構建一個新List——先有數據的情況(基本和test07是一個方法)

  • test09:CompletableFuture開啟多線程——有返回值的,返回一個新的List——先有數據的情況——使用stream流的map

  • test10:CompletableFuture 異常處理。相當於是 test09的增強,處理異常

  • test11:CompletableFuture 異常處理:如果出現異常就捨棄任務

    • 1、想了一下,出現異常後的任務確實沒有執行下去了,任務不往下執行,怎麼會發現異常呢?
      2、發現了異常任務也就完了。而且列印了異常,相當於返回了異常。
      3、未發生異常的任務會執行完成。如果發生異常都返回空,最後捨棄空的,就得到任務執行成功的 CompletableFuture
      

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓


package com.cc.md;

import com.cc.md.entity.UserCs;
import com.cc.md.service.IAsyncService;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@SpringBootTest
class Test01 {

    private static final Logger log = LoggerFactory.getLogger(Test01.class);

    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    /**
     * 非同步類
     */
    @Resource
    private IAsyncService asyncService;

    @Test
    void test01() {
        //獲取當前jdk能調用的CPU個數(當前伺服器的處理器個數)
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println(i);
    }

    //線程池原始的使用
    @Test
    void test02() {
        try {
            for (int i = 0; i < 1000; i++) {
                int finalI = i;
                myIoThreadPool.submit(() -> {
                    //第一批創建的線程數
                    log.info("列印:{}", finalI);
                    //模仿io流耗時
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }catch(Exception e){
            throw new RuntimeException(e);
        }finally {
            myIoThreadPool.shutdown();
        }
    }

    //開啟非同步1 —— @Async
    @Test
    public void test03() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        asyncService.async1();
        asyncService.async2();
        //不會等待非同步方法執行,直接返回前端數據
        log.info("列印:{}", "非同步測試的-主方法2");
    }

    //開啟非同步2 —— CompletableFuture.runAsync()
    @Test
    public void test04() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        CompletableFuture.runAsync(() -> {
            log.info("列印:{}", "非同步方法1!");
            //非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法1!-end");
        }, myIoThreadPool);
        //不會等待非同步方法執行,直接返回前端數據
        log.info("列印:{}", "非同步測試的-主方法2");
    }

    //非同步需要執行的方法,可以寫在同一個類中。
    private void async2(String msg) {
        //模仿io流耗時
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("列印:{}", msg);
    }

    //開啟非同步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync()  —— 阻塞所有非同步方法,一起提交
    //相當於開了3個線程去執行三個不同的方法,然後執行完後一起提交。
    @Test
    public void test05() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        //非同步執行1
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("列印:{}", "非同步方法1!");
            //非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法1-end");
            return "非同步方法1";
        }, myIoThreadPool);

        //非同步執行2
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            log.info("列印:{}", "非同步方法2!");
            //非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法2-end");
            return "非同步方法2";
        }, myIoThreadPool);

        //非同步執行3,不用我們自己的線程池 —— 用的就是系統自帶的 ForkJoinPool 線程池
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            log.info("列印:{}", "非同步方法3!");
            //非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法3-end");
        });

        //阻塞所有非同步方法,一起提交後才走下麵的代碼
        CompletableFuture.allOf(future1, future2, future3).join();

        log.info("列印:{}", "非同步-阻塞-測試的-主方法2-end");
    }

    //開啟非同步2的改造 —— 第一個任務執行完了,獲取到返回值,給後面的執行,可以連寫,也可以單寫。 —— 阻塞線程:get、join
    // CompletableFuture 的 get 和 join 方法區別:
    // get:①可以獲取線程中的異常、②設置等待時間
    // join:推薦在 CompletableFuture 中使用  join()  方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制類型轉換。
    @Test
    public void test052() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        //非同步執行1
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("列印:{}", "非同步方法1!");
            // 非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            String str = "非同步方法1-end";
            this.async2(str);
            return str;
        }, myIoThreadPool);

        // 非同步執行2 - 無返回值 —— 分開寫的方式
        CompletableFuture<Void> future2 = future1.thenAccept(str1 -> {
            log.info("列印:{}", "非同步方法2!");
            // 非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2(String.format("%s-加-非同步方法2! - 無返回值 - ",str1));
        });

        // 非同步執行3 - 有返回值 —— 分開寫future1,連寫future3方式
        CompletableFuture<String> future3 = future1.thenApply(str2 -> {
            log.info("列印:{}", "非同步方法3!");
            // 非同步執行的代碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2(String.format("%s-加-非同步方法3! - 有返回值 - ", str2));
            return "非同步執行3 - 有返回值 ";

            //連寫的方式。
        }).thenApply(str3 -> {
            String format = String.format("%s- end", str3);
            log.error("非同步3然後應用 - {}", format);
            //返回後面的應用
            return format;
        });
        // 獲取future3的返回值:
        //如果需要捕獲異常、設置等待超時時間,則用get
        log.info("future3的返回值(不阻塞):{}", future3.get());
//        log.info("future3的返回值(不阻塞-設置等待時間,超時報錯:TimeoutException):{}",
//                future3.get(2, TimeUnit.SECONDS));
        //推薦使用 join方法
//        log.info("future3的返回值(阻塞):{}", future3.join());

        //阻塞所有非同步方法,一起提交後才走下麵的代碼
        CompletableFuture.allOf(future1, future2).join();

        log.info("列印:{}", "非同步-阻塞-測試的-主方法2-end");
    }

    //CompletableFuture開啟多線程——無返回值的
    @Test
    public void test06() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //迴圈,模仿很多任務
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //第一批創建的線程數
                log.info("列印:{}", finalI);
                //模仿io流耗時
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞:多線程的任務執行。相當於多線程執行完了,再執行後面的代碼
        //如果不阻塞,上面的相當於非同步執行了。
        //阻塞方式1:可以獲取返回的異常、設置等待時間
//        futures.forEach(future -> {
//            try {
//                future.get();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
        //阻塞方式2(推薦)
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        log.info("列印:都執行完了。。。");
    }

    //CompletableFuture開啟多線程——無返回值的——構建一個新List
    //相當於多線程執行任務,然後把結果插入到List中
    //接收多線程的List必須是線程安全的,ArrayList線程不安全
    //線程安全的List —— CopyOnWriteArrayList 替代 ArrayList
    @Test
    public void test07() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //存數據的List
        List<UserCs> addList = new CopyOnWriteArrayList<>();
        //迴圈,模仿很多任務
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                log.info("列印:{}", finalI);
                UserCs userCs = new UserCs();
                userCs.setName(String.format("姓名-%s", finalI));
                userCs.setAge(finalI);
                addList.add(userCs);
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

        //返回新的List:endList,取age大於10的用戶
        List<UserCs> endList = addList.stream()
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //CompletableFuture開啟多線程——無返回值的——構建一個新List——先有數據的情況
    //用CopyOnWriteArrayList 替代 ArrayList接收
    @Test
    public void test08() throws Exception {
        //先獲取數據,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //開啟多線程
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //存數據的List
        List<UserCs> addList = new CopyOnWriteArrayList<>();
        //莫法處理任務
        users.forEach(user -> {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //添加數據
                user.setName(user.getName() + "-改");
                addList.add(user);

                log.info("列印-改:{}", user.getName());
                //其他的業務邏輯。。。

            }, myIoThreadPool);
            futures.add(future);
        });

        //阻塞
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

        //返回新的List:endList
        List<UserCs> endList = addList.stream()
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //CompletableFuture開啟多線程——有返回值的,返回一個新的List——先有數據的情況——使用stream流的map
    //像這種,需要構建另一個數組的,相當於一個線程執行完了,會有返回值
    //使用stream流的map + CompletableFuture.supplyAsync()
    @Test
    public void test09() throws Exception {
        //先獲取數據,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                    // 處理數據
                    user.setName(user.getName() + "-改");
                    log.info("列印-改:{}", user.getName());
                    // 其他的業務邏輯。。。

                    return user;
                }, myIoThreadPool)).collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有線程
                .map(CompletableFuture::join)
                //取age大於10的用戶
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //基礎數據
    private List<UserCs> getUserCs() {
        List<UserCs> users = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            UserCs userCs = new UserCs();
            userCs.setName(String.format("姓名-%s", i));
            userCs.setAge(i);
            users.add(userCs);
        }
        return users;
    }

    //CompletableFuture 異常處理
    @Test
    public void test10() throws Exception {
        //先獲取數據,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                        if (user.getAge() > 5){
                            int a = 1/0;
                        }
                        // 處理數據
                        user.setName(user.getName() + "-改");
                        log.info("列印-改:{}", user.getName());
                        // 其他的業務邏輯。。。

                        return user;
                    }, myIoThreadPool)
                    //處理異常方式1:返回預設值或者一個替代的 Future 對象,從而避免系統的崩潰或異常處理的問題。
                    .exceptionally(throwable -> {
                        //可以直接獲取user
                        System.out.println("異常了:" + user);
                        //處理異常的方法……
                        //1還可以進行業務處理……比如將異常數據存起來,然後導出……
                        //2返回預設值,如:user、null
                        //return user;
                        //3拋出異常
                        throw new RuntimeException(throwable.getMessage());
                    })
                    //處理異常方式2:類似exceptionally(不推薦)
//                    .handle((userCs, throwable) -> {
//                        System.out.println("handle:" + user);
//                        if (throwable != null) {
//                            // 處理異常
//                            log.error("處理用戶信息出現異常,用戶名為:" + user.getName(), throwable);
//                            // 返回原始數據
//                            return userCs;
//                        } else {
//                            // 返回正常數據
//                            return userCs;
//                        }
//                    })
                )
                .collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有線程
                .map(CompletableFuture::join)
                //取age大於10的用戶
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //CompletableFuture 異常處理:如果出現異常就捨棄任務。
    // 想了一下,出現異常後的任務確實沒有執行下去了,任務不往下執行,怎麼會發現異常呢?
    // 發現了異常任務也就完了。而且列印了異常,相當於返回了異常。
    // 未發生異常的任務會執行完成。如果發生異常都返回空,最後捨棄空的,就得到任務執行成功的 CompletableFuture
    @Test
    public void test11() {
        List<UserCs> users = getUserCs();
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                            if (user.getAge() > 15) {
                                int a = 1 / 0;
                            }
                            user.setName(user.getName() + "-改");
                            log.info("列印-改:{}", user.getName());
                            return user;
                        }, myIoThreadPool)
                        //處理異常
                        .exceptionally(throwable -> {
                            //其他處理異常的邏輯

                            return null;
                        })
                )
                //捨棄返回的對象是null的 CompletableFuture
                .filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有線程
                .map(CompletableFuture::join)
                //取age大於10的用戶
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);

    }

}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • [toc] # 高階函數 高階函數是將函數用作參數或返回值的函數,還可以把函數賦值給一個變數。 所有函數類型都有一個圓括弧括起來的參數類型列表以及一個返回類型:(A, B) -> C 表示接受類型分別為 A 與 B 兩個參數並返回一個 C 類型值的函數類型。 參數類型列表可以為空,如 () -> A ...
  • 本文將為大家詳細講解Java中的Map集合,這是我們進行開發時經常用到的知識點,也是大家在學習Java中很重要的一個知識點,更是我們在面試時有可能會問到的問題。文章較長,乾貨滿滿,建議大家收藏慢慢學習。文末有本文重點總結,主頁有全系列文章分享。技術類問題,歡迎大家和我們一起交流討論! ...
  • # 0.相關確定 本教程使用的版本號為專業版PyCharm 2022.3.2,如果您是初學者,為了更好的學習本教程,避免不必要的麻煩,請您下載使用與本教程一致的版本號。 # 1.PyCharm的下載 官網下載:https://www.jetbrains.com/pycharm/download/ot ...
  • Servlet是web體系裡面最重要的部分,下麵羅列幾道常見的面試題,小伙伴們一定要好好記住哈。 1.Servlet是單例的嗎,如何證明? Servlet一般都是單例的,並且是多線程的。如何證明Servlet是單例模式呢?很簡單,重寫Servlet的init方法,或者添加一個構造方法。然後,在web ...
  • Rocksdb作為當下nosql中性能的代表被各個存儲組件(mysql、tikv、pmdk、bluestore)作為存儲引擎底座,其基於LSM tree的核心存儲結構(將隨機寫通過數據結構轉化為順序寫)來提供高性能的寫吞吐時保證了讀性能。同時大量的併發性配置來降低compaction的影響。 ...
  • 本篇為[用go設計開發一個自己的輕量級登錄庫/框架吧]的封禁業務篇,會講講封禁業務的實現,給庫/框架增加新的功能。源碼:https://github.com/weloe/token-go ...
  • 哈嘍大家好,我是鹹魚 幾天前,IBM 工程師 Martin Heinz 發文表示 python 3.12 版本回引入"Per-Interpreter GIL”,有了這個 Per-Interpreter 全局解釋器鎖,python 就能實現真正意義上的並行/併發 我們知道,python 的多線程/進程 ...
  • ## 1. ThreadLocal 是什麼 JDK 對`ThreadLocal`的描述為: > 此類提供線程局部變數。這些變數與普通變數的不同之處在於,每個訪問一個變數的線程(通過其get或set方法)都有自己的、獨立初始化的變數副本。ThreadLocal 實例通常是類中的私有靜態欄位,這些欄位希 ...
一周排行
    -Advertisement-
    Play Games
  • 概述:在C#中,++i和i++都是自增運算符,其中++i先增加值再返回,而i++先返回值再增加。應用場景根據需求選擇,首碼適合先增後用,尾碼適合先用後增。詳細示例提供清晰的代碼演示這兩者的操作時機和實際應用。 在C#中,++i 和 i++ 都是自增運算符,但它們在操作上有細微的差異,主要體現在操作的 ...
  • 上次發佈了:Taurus.MVC 性能壓力測試(ap 壓測 和 linux 下wrk 壓測):.NET Core 版本,今天計劃準備壓測一下 .NET 版本,來測試並記錄一下 Taurus.MVC 框架在 .NET 版本的性能,以便後續持續優化改進。 為了方便對比,本文章的電腦環境和測試思路,儘量和... ...
  • .NET WebAPI作為一種構建RESTful服務的強大工具,為開發者提供了便捷的方式來定義、處理HTTP請求並返迴響應。在設計API介面時,正確地接收和解析客戶端發送的數據至關重要。.NET WebAPI提供了一系列特性,如[FromRoute]、[FromQuery]和[FromBody],用 ...
  • 原因:我之所以想做這個項目,是因為在之前查找關於C#/WPF相關資料時,我發現講解圖像濾鏡的資源非常稀缺。此外,我註意到許多現有的開源庫主要基於CPU進行圖像渲染。這種方式在處理大量圖像時,會導致CPU的渲染負擔過重。因此,我將在下文中介紹如何通過GPU渲染來有效實現圖像的各種濾鏡效果。 生成的效果 ...
  • 引言 上一章我們介紹了在xUnit單元測試中用xUnit.DependencyInject來使用依賴註入,上一章我們的Sample.Repository倉儲層有一個批量註入的介面沒有做單元測試,今天用這個示例來演示一下如何用Bogus創建模擬數據 ,和 EFCore 的種子數據生成 Bogus 的優 ...
  • 一、前言 在自己的項目中,涉及到實時心率曲線的繪製,項目上的曲線繪製,一般很難找到能直接用的第三方庫,而且有些還是定製化的功能,所以還是自己繪製比較方便。很多人一聽到自己畫就害怕,感覺很難,今天就分享一個完整的實時心率數據繪製心率曲線圖的例子;之前的博客也分享給DrawingVisual繪製曲線的方 ...
  • 如果你在自定義的 Main 方法中直接使用 App 類並啟動應用程式,但發現 App.xaml 中定義的資源沒有被正確載入,那麼問題可能在於如何正確配置 App.xaml 與你的 App 類的交互。 確保 App.xaml 文件中的 x:Class 屬性正確指向你的 App 類。這樣,當你創建 Ap ...
  • 一:背景 1. 講故事 上個月有個朋友在微信上找到我,說他們的軟體在客戶那邊隔幾天就要崩潰一次,一直都沒有找到原因,讓我幫忙看下怎麼回事,確實工控類的軟體環境複雜難搞,朋友手上有一個崩潰的dump,剛好丟給我來分析一下。 二:WinDbg分析 1. 程式為什麼會崩潰 windbg 有一個厲害之處在於 ...
  • 前言 .NET生態中有許多依賴註入容器。在大多數情況下,微軟提供的內置容器在易用性和性能方面都非常優秀。外加ASP.NET Core預設使用內置容器,使用很方便。 但是筆者在使用中一直有一個頭疼的問題:服務工廠無法提供請求的服務類型相關的信息。這在一般情況下並沒有影響,但是內置容器支持註冊開放泛型服 ...
  • 一、前言 在項目開發過程中,DataGrid是經常使用到的一個數據展示控制項,而通常表格的最後一列是作為操作列存在,比如會有編輯、刪除等功能按鈕。但WPF的原始DataGrid中,預設只支持固定左側列,這跟大家習慣性操作列放最後不符,今天就來介紹一種簡單的方式實現固定右側列。(這裡的實現方式參考的大佬 ...