- Future :獲取非同步返回的結果需要使用輪詢的方式,消耗cup ```java ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit( ...
-
Future :獲取非同步返回的結果需要使用輪詢的方式,消耗cup
ExecutorService executorService = Executors.newFixedThreadPool(10); Future<String> future = executorService.submit(()->{ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "future"; }); while(true){ if(future.isDone()){ System.out.println(future.get()); break; } }
-
CompletableFuture:採用觀察者模式,阻塞獲取非同步返回的結果,性能得到優化
System.out.println("=============CompletableFuture==================="); CompletableFuture testFuture1 = CompletableFuture.supplyAsync(()->{ return "麗麗1"; }).thenApply((element)->{ System.out.println("testFuture1後續操作:"+element); return "麗麗2"; }); System.out.println(testFuture1.get()); System.out.println("=============CompletableFuture==================="); CompletableFuture testFuture2 = CompletableFuture.supplyAsync(()->{ return "麗麗1"; }).thenAccept((element)->{ System.out.println("testFuture2後續操作:"+element); }); System.out.println(testFuture2.get());
-
CompletableFuture的使用明細
- 官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio
* runAsync 無返回值 * supplyAsync 有返回值 * * thenAccept 無返回值 * thenApply 有返回值 * thenRun 不關心上一步執行結果,執行下一個操作 * get() 為阻塞獲取 可設置超時時間 避免長時間阻塞
實現介面 AsyncFunction 用於請求分發 定義一個callback回調函數,該函數用於取出非同步請求的返回結果,並將返回的結果傳遞給ResultFuture 對DataStream的數據使用Async操作
-
例子
/** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. * 通過向資料庫發送非同步請求並設置回調方法 */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** The database specific client that can issue concurrent requests with callbacks 可以非同步請求的特定資料庫的客戶端 */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // issue the asynchronous request, receive a future for result // 發起一個非同步請求,返回結果的 future final Future<String> result = client.query(key); // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future // 設置請求完成時的回調.將結果傳遞給 result future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // create the original stream // 創建一個原始的流 DataStream<String> stream = ...; // apply the async I/O transformation // 添加一個 async I/O ,指定超時時間,和進行中的非同步請求的最大數量 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
-
註意事項
- Timeout:定義請求超時時間,非同步請求多久沒完成會被認為是超時了
- Capacity:定義了同時進行的非同步請求的數量,可以限制併發請求數量,不會積壓過多的請求
- 超時處理:預設當一個非同步 I/O 請求超時時,會引發異常並重新啟動作業。 如果要處理超時,可以覆蓋該
AsyncFunction的timeout
方法來自定義超時之後的處理方式 - 響應結果的順序:AsyncDataStream包含兩種輸出模式,
- unorderedWait無序:響應結果的順序與非同步請求的順序不同
- orderedWait有序:響應結果的順序與非同步請求的順序相同