一、延遲計算 RDD 代表的是分散式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations) 在 RDD 的編程模型中,一共有兩種運算元,Transformations 類運算元和 Actions 類運算元。開發者需要使用 Transformations ...
一、延遲計算
RDD 代表的是分散式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations)
在 RDD 的編程模型中,一共有兩種運算元,Transformations 類運算元和 Actions 類運算元。開發者需要使用 Transformations 類運算元,定義並描述數據形態的轉換過程,然後調用 Actions 類運算元,將計算結果收集起來、或是物化到磁碟。
在這樣的編程模型下,Spark 在運行時的計算被劃分為兩個環節。
- 基於不同數據形態之間的轉換,構建計算流圖(DAG,Directed Acyclic Graph)
- 通過 Actions 類運算元,以回溯的方式去觸發執行這個計算流圖
換句話說,開發者調用的各類 Transformations 運算元,並不立即執行計算,當且僅當開發者調用 Actions 運算元時,之前調用的轉換運算元才會付諸執行。在業內,這樣的計算模式有個專門的術語,叫作“延遲計算”(Lazy Evaluation)。
二、Spark運算元分類
在 RDD 的開發框架下,哪些運算元屬於 Transformations 運算元,哪些運算元是 Actions 運算元呢?
這裡給出一張自己在極客看的課程中的圖
三、Transform運算元執行流程(源碼)
Map轉換算是 RDD 的經典轉換操作之一了.就以它開頭.Map的源碼如下:
1. sc.clean(f)
首先掉了一個sc.clean(f) , 我們進到clean函數里看下:
註釋中明確提到了這個函數的功能:clean 整理一個閉包,使其可以序列化併發送到任務.
這裡的代碼有些多,大概知道這個函數的功能是這樣就ok了,閉包的問題會在另一篇文章里仔細介紹
2. MapPartitionsRDD
進入到函數後源碼如下:
這是一個MapPartitionsRDD。我們仔細看它的構成,從而來理解它是如何描述MapPartitionsRDD的.
2.1 var prev:RDD[T]
這裡的 prev 就是父RDD,f 則是Map中傳入的處理函數,除了這兩個就沒有了,也就是說明 RDD中沒有存儲具體的數據本身
這再次印證了轉換不會產生任何數據.它只是單純了記錄父RDD以及如何轉換的過程就完了,不會在轉換階段產生任何數據集
2.2 preservesPartitioning
preservesPartitioning 表示是否保持父RDD的分區信息.
如果為false(預設為false),則會對結果重新分區.也就是Map系預設都會分區
如果為true,保留分區. 則按照 firstParent 保留分區
可以看到根據 dependencies 找到其第一個父 RDD
2.3 compute 計算邏輯
2.3.1 compute方法
RDD
抽象類要求其所有子類都必須實現 compute
方法,該方法接受的參數之一是一個Partition
對象,目的是計算該分區中的數據。
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
可以看到,compute 方法調用當前 RDD 內的第一個父 RDD 的 iterator 方法,該方的目的是拉取父 RDD
對應分區內的數據。
iterator
方法會返回一個迭代器對象,迭代器內部存儲的每個元素即父 RDD 對應分區內已經計算完畢的數據記錄。得到的迭代器作為 f
方法的一個參數。f
在 RDD
類的 map
方法中指定,即實際的轉換函數。
compute
方法會將迭代器中的記錄一一輸入 f
方法,得到的新迭代器即為所求分區中的數據。
其他 RDD
子類的 compute
方法與之類似,在需要用到父 RDD 的分區數據時候,就會調用 iterator
方法,然後根據需求在得到的數據之上執行粗粒度的操作。換句話說,compute
函數負責的是父 RDD
分區數據到子 RDD
分區數據的變換邏輯。
2.3.2 iterator方法
此方法的實現在 RDD 這個抽象類中
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementers of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
interator首先檢查 存儲級別 storageLevel:此處可參考RDD持久化
如果存儲級別不是NONE, 說明分區的數據說明分區的數據要麼已經存儲在文件系統當中,要麼當前 RDD 曾經執行過 cache
、 persise
等持久化操作,此時需要從存儲空間讀取分區數據,調用 getOrCompute 方法
getOrCompute 方法會根據 RDD 編號:id 與 分區編號:partition.index 計算得到當前分區在存儲層對應的塊編號:blockId,通過存儲層提供的數據讀取介面提取出塊的數據。
代碼中的這幾句註釋給的非常到位,大致的判斷順序如下:
- 塊命中的情況:也就是數據之前已經成功存儲到介質中,這其中可能是數據本身就在存儲介質中(比如通過讀取HDFS創建的RDD),也可能是 RDD 在經過持久化操作並且經歷了一次計算過程,這個時候我們就能成功讀取數據並將其返回
- 塊未命中的情況:可能是數據已經丟失,或者 RDD 經過持久化操作,但是是當前分區數據是第一次被計算,因此會出現拉取得到數據為
None
的情況。這就意味著我們需要計算分區數據,繼續調用RDD
類computeOrReadCheckpoint
方法來計算數據,並將計算得到的數據緩存到存儲介質中,下次就無需再重覆計算。
如果當前RDD的存儲級別為 None
,說明為未經持久化的 RDD
,需要重新計算 RDD 內的數據,這時候調用 RDD
類的 computeOrReadCheckpoint
方法,該方法也在持久化 RDD 的分區獲取數據失敗時被調用。
computeOrReadCheckpoint 方法會檢查當前 RDD 是否已經被標記成檢查點,如果未被標記成檢查點,則執行自身的 compute
方法來計算分區數據,否則就直接拉取父 RDD 分區內的數據。
需要註意的是,對於標記成檢查點的情況,當前 RDD 的父 RDD 不再是原先轉換操作中提供數據的父 RDD,而是被 Apache Spark 替換成一個 CheckpointRDD 對象,該對象中的數據存放在文件系統中,因此最終該對象會從文件系統中讀取數據並返回給 computeOrReadCheckpoint
方法
參考文章: