併發系列(4)之 Future 框架詳解

来源:https://www.cnblogs.com/sanzao/archive/2019/04/07/10666614.html
-Advertisement-
Play Games

本文將主要講解 J.U.C 中的 Future 框架,並分析結合源碼分析其內部結構邏輯; 一、Future 框架概述 JDK 中的 Future 框架實際就是 Future 模式的實現,通常情況下我們會配合線程池使用,但也可以單獨使用;下麵我們就單獨使用簡單舉例; 1. 應用實例 列印: 如上面代碼 ...


本文將主要講解 J.U.C 中的 Future 框架,並分析結合源碼分析其內部結構邏輯;

一、Future 框架概述

JDK 中的 Future 框架實際就是 Future 模式的實現,通常情況下我們會配合線程池使用,但也可以單獨使用;下麵我們就單獨使用簡單舉例;

1. 應用實例

FutureTask<String> future = new FutureTask<>(() -> {
  log.info("非同步任務執行...");
  Thread.sleep(2000);
  log.info("過了很久很久...");
  return "非同步任務完成";
});

log.info("啟動非同步任務...");
new Thread(future).start();

log.info("繼續其他任務...");
Thread.sleep(1000);

log.info("獲取非同步任務結果:{}", future.get());

列印:

[15:38:03,231 INFO ] [main]     - 啟動非同步任務...
[15:38:03,231 INFO ] [main]     - 繼續其他任務...
[15:38:03,231 INFO ] [Thread-0] - 非同步任務執行...
[15:38:05,232 INFO ] [Thread-0] - 過了很久很久...
[15:38:05,236 INFO ] [main]     - 獲取非同步任務結果:非同步任務完成

如上面代碼所示,首先我們將要執行的任務包裝成 Callable,這裡如果不需要返回值也可以使用 Runnable;然後構建 FutureTask 由一個線程啟動,最後使用 Future.get() 獲取非同步任務結果;

2. Future 運行邏輯

對於 Future 模式的流程圖如下:

future

對比上面的實例代碼,大家可能會發現有些不一樣,因為在 FutureTask 同時繼承了 Runnable 和 Future 介面,所以再提交任務後沒有返回Future,而是直接使用自身調用 get;下麵我們就對源碼進行實際分析;


二、源碼分析

1. FutureTask 主體結構

public interface RunnableFuture<V> extends Runnable, Future<V> {}

public class FutureTask<V> implements RunnableFuture<V> {
  private volatile int state;         // 任務運行狀態
  private Callable<V> callable;       // 非同步任務
  private Object outcome;             // 返回結果
  private volatile Thread runner;     // 非同步任務執行線程
  private volatile WaitNode waiters;  // 等待非同步結果的線程棧(通過Treiber stack演算法實現)
  
  public FutureTask(Callable<V> callable) {  // 需要返回值
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;     // ensure visibility of callable
  }
  
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;     // ensure visibility of callable
  }
  ...
}

另外在代碼中還可以看見有很多地方都是用了 CAS 來更新變數,而 JDK1.6 中甚至使用了 AQS 來實現;其原因就是同一個 FutureTask 可以多個線程同時提交,也可以多個線程同時獲取; 所以代碼中有很多的狀態變數:

// FutureTask.state 取值
private static final int NEW          = 0;  // 初始化到結果返回前
private static final int COMPLETING   = 1;  // 結果賦值
private static final int NORMAL       = 2;  // 執行完畢
private static final int EXCEPTIONAL  = 3;  // 執行異常
private static final int CANCELLED    = 4;  // 任務取消
private static final int INTERRUPTING = 5;  // 設置中斷狀態
private static final int INTERRUPTED  = 6;  // 任務中斷

同時源碼的註釋中也詳細給出了可能出現的狀態轉換:

  • NEW -> COMPLETING -> NORMAL // 任務正常執行
  • NEW -> COMPLETING -> EXCEPTION // 任務執行異常
  • NEW ->CANCELLED // 任務取消
  • NEW -> INITERRUPTING -> INTERRUPTED // 任務中斷

註意這裡的 COMPLETING 狀態是一個很微妙的狀態,正因為有他的存在才能實現無鎖賦值;大家先留意這個狀態,然後在代碼中應該能體會到;另外這裡還有一個變數需要註意,WaitNode ;使用 Treiber stack 演算法實現的無鎖棧;其原理說明可以參考下麵第三節;


2. 任務執行

public void run() {
  if (state != NEW ||  // 確保任務執行完成後,不再重覆執行
    !UNSAFE.compareAndSwapObject(this, runnerOffset, 
                                 null, Thread.currentThread()))  // 確保只有一個線程執行
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex);    // 設置異常結果
      }
      if (ran) set(result);  // 設置結果
    }
  } finally {
    runner = null;
    int s = state;
    if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);  // 確保中斷狀態已經設置
  }
}
// 設置非同步任務結果
protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  // 保證結果只能設置一次
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion(); // 喚醒等待線程
  }
}
protected void setException(Throwable t) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  // 保證結果只能設置一次
    outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    finishCompletion();
  }
}


3. 任務取消

public boolean cancel(boolean mayInterruptIfRunning) {
  if (!(state == NEW &&  // 只有在任務執行階段才能取消
      UNSAFE.compareAndSwapInt(this, stateOffset, NEW,  // 設置取消狀態
        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
  try {  // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
      try {
        Thread t = runner;
        if (t != null)
          t.interrupt();
      } finally { // final state
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
      }
    }
  } finally {
    finishCompletion();
  }
  return true;
}

註意 cancel(false) 也就是僅取消,並沒有打斷;非同步任務會繼續執行,只是這裡首先設置了 FutureTask.state = CANCELLED ,所以最後在設置結果的時候會失敗,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)


4. 獲取結果

public V get() throws InterruptedException, ExecutionException {
  int s = state;
  if (s <= COMPLETING)
    s = awaitDone(false, 0L);  // 阻塞等待
  return report(s);
}

private V report(int s) throws ExecutionException {  // 根據最後的狀態返回結果
  Object x = outcome;
  if (s == NORMAL) return (V)x;
  if (s >= CANCELLED) throw new CancellationException();
  throw new ExecutionException((Throwable)x);
}
private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
  final long deadline = timed ? System.nanoTime() + nanos : 0L;
  WaitNode q = null;
  boolean queued = false;
  for (;;) {
    if (Thread.interrupted()) {
      removeWaiter(q);   // 移除等待節點
      throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {  // 任務已完成
      if (q != null)
        q.thread = null;
      return s;
    }
    else if (s == COMPLETING) // 正在賦值,直接先出讓線程
      Thread.yield();
    else if (q == null)       // 任務還未完成需要等待
      q = new WaitNode();
    else if (!queued)
      queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                         q.next = waiters, q);   // 使用 Treiber stack 演算法
    else if (timed) {
      nanos = deadline - System.nanoTime();
      if (nanos <= 0L) {
        removeWaiter(q);
        return state;
      }
      LockSupport.parkNanos(this, nanos);
    }
    else
      LockSupport.park(this);
  }
}


三、Treiber stack

在《Java 併發編程實戰》中講了, 創建非阻塞演算法的關鍵在於,找出如何將原子修改的範圍縮小到單個變數上,同時還要維護數據的一致性 。

@ThreadSafe public class ConcurrentStack <E> {
  AtomicReference<Node<E>> top = new AtomicReference<>();
  
  private static class Node <E> {
    public final E item;
    public Node<E> next;

    public Node(E item) {
      this.item = item;
    }
  }

  public void push(E item) {
    Node<E> newHead = new Node<>(item);
    Node<E> oldHead;
    do {
      oldHead = top.get();
      newHead.next = oldHead;
    } while (!top.compareAndSet(oldHead, newHead));
  }

  public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
      oldHead = top.get();
      if (oldHead == null)
        return null;
      newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    return oldHead.item;
  }
}


總結

  • 總體來講源碼比較簡單,因為其本身只是一個 Future 模式的實現
  • 但是其中的狀態量的設置,還有裡面很多無鎖的處理方式,才是 FutureTask 帶給我們的精華!

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 2019 04 07 第一次寫博客就記錄最基礎的c語言下的利用迴圈 使用時間戳的猜數字游戲 vs 實現 #define _CRT_SECURE_NO_WARNINGS // vs 下的巨集定義 (scanf) #include<stdio.h> #include<stdlib.h> #include< ...
  • 原理: 從客戶端上傳到伺服器 照片——文件夾——資料庫 例如:桌面一張照片,在tomacat里創建upload文件夾,把桌面照片上傳到upload文件夾里,並且把照片的名字取出來,取完名字把這個名字插入到資料庫裡面,下次要想取就取這個名字到upload文件夾下麵去尋找這個照片,找到以後寫相對路徑,就 ...
  • 給定一個 n × n 的二維矩陣表示一個圖像。將圖像順時針旋轉 90 度。說明:你必須在原地旋轉圖像,這意味著你需要直接修改輸入的二維矩陣。請不要使用另一個矩陣來旋轉圖像。示例 1:給定 matrix = [ [1,2,3], [4,5,6], [7,8,9]],原地旋轉輸入矩陣,使其變為:[ [7 ...
  • 給定一個沒有重覆數字的序列,返回其所有可能的全排列。示例:輸入: [1,2,3]輸出:[ [1,2,3], [1,3,2], [2,1,3], [2,3,1], [3,1,2], [3,2,1]] 兩種方法,第一種用了STL中的函數,第二種用遞歸+回溯,我個人很喜歡第二種方法 ...
  • 使用Consul提供註冊和發現服務 什麼是 Consul Consul 是 HashiCorp 公司推出的開源工具,用於實現分散式系統的服務發現與配置。與其它分散式服務註冊與發現的方案,Consul 的方案更“一站式”,內置了服務註冊與發現框架、分佈一致性協議實現、健康檢查、Key/Value 存儲 ...
  • 前言 - 那久遠的故事 工作好多年, 有時腦海裡總回想兒時看的夢. 那時還剛上初中, 班上個子小的同學, 閑暇娛樂可能就是看 <飄渺之旅> 之類的小說. 前幾年嘗試滿足自己少年時的記憶. 於是寫加整理了這本書 <C 修真之旅> 讓我們蕩起雙槳 - https://music.163.com/#/so ...
  • 1.Sublime Text: 這是一個輕量級的代碼編輯器,跨平臺,支持幾十種編程語言,包括Python,Java,C/C++等,小巧靈活,運行輕快,支持代碼高亮、自動補全、語法提示,插件擴展豐富,是一個很不錯的代碼編輯器,配置相關文件後,可直接運行python程式: 2.VS Code: 這是微軟 ...
  • 最近在學習Python,不得不說,Python真的是一門很好用的語言。但是學習的過程中關於變數作用域(scope)的命名空間(namespace)的問題真的把我給搞懵了。在查閱了相關資料之後,覺得自己對Python的作用域和命名空間有了一定得瞭解。故寫在這裡,一方面加深自己的理解,另一方面分享知識。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...