Spark源碼解析(一):RDD之Transfrom運算元

来源:https://www.cnblogs.com/river97/archive/2023/03/31/17277318.html
-Advertisement-
Play Games

一、延遲計算 RDD 代表的是分散式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations) 在 RDD 的編程模型中,一共有兩種運算元,Transformations 類運算元和 Actions 類運算元。開發者需要使用 Transformations ...


一、延遲計算

RDD 代表的是分散式數據形態,因此,RDD 到 RDD 之間的轉換,本質上是數據形態上的轉換(Transformations)

在 RDD 的編程模型中,一共有兩種運算元,Transformations 類運算元和 Actions 類運算元。開發者需要使用 Transformations 類運算元,定義並描述數據形態的轉換過程,然後調用 Actions 類運算元,將計算結果收集起來、或是物化到磁碟。

在這樣的編程模型下,Spark 在運行時的計算被劃分為兩個環節。

  1. 基於不同數據形態之間的轉換,構建計算流圖(DAG,Directed Acyclic Graph)
  2. 通過 Actions 類運算元,以回溯的方式去觸發執行這個計算流圖

換句話說,開發者調用的各類 Transformations 運算元,並不立即執行計算,當且僅當開發者調用 Actions 運算元時,之前調用的轉換運算元才會付諸執行。在業內,這樣的計算模式有個專門的術語,叫作“延遲計算”(Lazy Evaluation)。

二、Spark運算元分類

在 RDD 的開發框架下,哪些運算元屬於 Transformations 運算元,哪些運算元是 Actions 運算元呢?

這裡給出一張自己在極客看的課程中的圖

img

三、Transform運算元執行流程(源碼)

Map轉換算是 RDD 的經典轉換操作之一了.就以它開頭.Map的源碼如下:

image-20230224103912741

1. sc.clean(f)

首先掉了一個sc.clean(f) , 我們進到clean函數里看下:

image-20230224104117191

註釋中明確提到了這個函數的功能:clean 整理一個閉包,使其可以序列化併發送到任務.

這裡的代碼有些多,大概知道這個函數的功能是這樣就ok了,閉包的問題會在另一篇文章里仔細介紹

2. MapPartitionsRDD

進入到函數後源碼如下:

image-20230224105719758

這是一個MapPartitionsRDD。我們仔細看它的構成,從而來理解它是如何描述MapPartitionsRDD的.

2.1 var prev:RDD[T]

這裡的 prev 就是父RDD,f 則是Map中傳入的處理函數,除了這兩個就沒有了,也就是說明 RDD中沒有存儲具體的數據本身

這再次印證了轉換不會產生任何數據.它只是單純了記錄父RDD以及如何轉換的過程就完了,不會在轉換階段產生任何數據集

2.2 preservesPartitioning

preservesPartitioning 表示是否保持父RDD的分區信息.
如果為false(預設為false),則會對結果重新分區.也就是Map系預設都會分區
如果為true,保留分區. 則按照 firstParent 保留分區   

image-20230224110557226

可以看到根據 dependencies 找到其第一個父 RDD

image-20230224110711910

2.3 compute 計算邏輯
2.3.1 compute方法

RDD 抽象類要求其所有子類都必須實現 compute 方法,該方法接受的參數之一是一個Partition 對象,目的是計算該分區中的數據。

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

可以看到,compute 方法調用當前 RDD 內的第一個父 RDD 的 iterator 方法,該方的目的是拉取父 RDD 對應分區內的數據。

iterator 方法會返回一個迭代器對象,迭代器內部存儲的每個元素即父 RDD 對應分區內已經計算完畢的數據記錄。得到的迭代器作為 f 方法的一個參數。fRDD 類的 map 方法中指定,即實際的轉換函數。

compute 方法會將迭代器中的記錄一一輸入 f 方法,得到的新迭代器即為所求分區中的數據。

其他 RDD 子類的 compute 方法與之類似,在需要用到父 RDD 的分區數據時候,就會調用 iterator 方法,然後根據需求在得到的數據之上執行粗粒度的操作。換句話說,compute 函數負責的是父 RDD 分區數據到子 RDD 分區數據的變換邏輯。

2.3.2 iterator方法

此方法的實現在 RDD 這個抽象類中

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementers of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

interator首先檢查 存儲級別 storageLevel:此處可參考RDD持久化

如果存儲級別不是NONE, 說明分區的數據說明分區的數據要麼已經存儲在文件系統當中,要麼當前 RDD 曾經執行過 cachepersise 等持久化操作,此時需要從存儲空間讀取分區數據,調用 getOrCompute 方法

image-20230224114953570

getOrCompute 方法會根據 RDD 編號:id分區編號:partition.index 計算得到當前分區在存儲層對應的塊編號:blockId,通過存儲層提供的數據讀取介面提取出塊的數據。

代碼中的這幾句註釋給的非常到位,大致的判斷順序如下:

  • 塊命中的情況:也就是數據之前已經成功存儲到介質中,這其中可能是數據本身就在存儲介質中(比如通過讀取HDFS創建的RDD),也可能是 RDD 在經過持久化操作並且經歷了一次計算過程,這個時候我們就能成功讀取數據並將其返回
  • 塊未命中的情況:可能是數據已經丟失,或者 RDD 經過持久化操作,但是是當前分區數據是第一次被計算,因此會出現拉取得到數據為 None 的情況。這就意味著我們需要計算分區數據,繼續調用 RDDcomputeOrReadCheckpoint 方法來計算數據,並將計算得到的數據緩存到存儲介質中,下次就無需再重覆計算。

如果當前RDD的存儲級別為 None,說明為未經持久化的 RDD,需要重新計算 RDD 內的數據,這時候調用 RDD 類的 computeOrReadCheckpoint 方法,該方法也在持久化 RDD 的分區獲取數據失敗時被調用。

image-20230224142431572

computeOrReadCheckpoint 方法會檢查當前 RDD 是否已經被標記成檢查點,如果未被標記成檢查點,則執行自身的 compute 方法來計算分區數據,否則就直接拉取父 RDD 分區內的數據。

需要註意的是,對於標記成檢查點的情況,當前 RDD 的父 RDD 不再是原先轉換操作中提供數據的父 RDD,而是被 Apache Spark 替換成一個 CheckpointRDD 對象,該對象中的數據存放在文件系統中,因此最終該對象會從文件系統中讀取數據並返回給 computeOrReadCheckpoint 方法

參考文章:

Cache 和 Checkpoint


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言: https://www.cnblogs.com/LoveBB/p/17277662.html 什麼是範型 JDK 1.5開始引入Java泛型(generics)這個特性,該特性提供了編譯時類型安全檢測機制,允許程式員在編譯時檢測到非法的類型。停,業餘的客套話就不多說了,這些術語,講了N遍,不 ...
  • 還不會 Quartz?如果你還沒有接觸過Quartz,那麼你可能錯過了一個很棒的任務調度框架!Quartz 提供了一種靈活、可靠的方式來管理和執行定時任務,讓咱們的定時任務更加優雅。 ...
  • 1.標識符 程式中對類、變數等的命名,稱為標識符; 標識符命名規則: 由數字、字母、下劃線、美元符組成,不能以數字開頭; 嚴格區分大小寫; 不能與關鍵字或保留字重名; 標識符的命名最好能反應出其作用。 2.關鍵字 程式中對編譯器有特殊意義的詞,例如class被用來定義類,當程式執行遇到class時, ...
  • 模型之間的關係(Relations Between Models) 上一章介紹了為包含基本欄位的模型創建自定義視圖。然而,在任何真實的業務場景中,我們都需要不止一個模型。此外,模型之間的鏈接是必要的。人們可以很容易地想象一個模型包含客戶,另一個模型則包含用戶列表。你可能需要參考任何現有業務模型上的客 ...
  • ChatGPT是一個基於GPT-3.5架構的自然語言處理工具,它具有文本生成、文本分類、對話生成等多種能力。作為一種強大的自然語言處理工具,ChatGPT可以應用於智能客服、智能問答、內容創作等多個領域。如果您對ChatGPT感興趣,可以通過關註本公眾號瞭解更多信息,並體驗基於ChatGPT的小程式... ...
  • Spring Spring為簡化開發而生,讓程式員只關心核心業務的實現,儘可能的不在關註非業務邏輯代碼(事務控制,安全日誌等)。 1,Spring八大模塊 這八大模塊組成了Spring 1.1 Spring Core模塊 這是Spring框架的最基礎的部分,它提供了依賴註入(DependencyIn ...
  • React Native 備忘清單 適合初學者的綜合 React Native 備忘清單,在開始 React Native 之前需要先掌握 react 庫入門,為開發人員分享快速參考備忘單。 React Native (簡稱RN)是Facebook於2015年4月開源的跨平臺移動應用開發框架,是Fa ...
  • React 備忘清單 IT寶庫整理適合初學者入門的React開發速查備忘清單,為開發人員分享快速參考備忘單。 React是用於構建用戶界面的JavaScript庫,起源於Facebook的內部項目,該公司對市場上所有 JavaScript MVC框架都不滿意,決定自行開發一套,用於架設Instagr ...
一周排行
    -Advertisement-
    Play Games
  • Dapr Outbox 是1.12中的功能。 本文只介紹Dapr Outbox 執行流程,Dapr Outbox基本用法請閱讀官方文檔 。本文中appID=order-processor,topic=orders 本文前提知識:熟悉Dapr狀態管理、Dapr發佈訂閱和Outbox 模式。 Outbo ...
  • 引言 在前幾章我們深度講解了單元測試和集成測試的基礎知識,這一章我們來講解一下代碼覆蓋率,代碼覆蓋率是單元測試運行的度量值,覆蓋率通常以百分比表示,用於衡量代碼被測試覆蓋的程度,幫助開發人員評估測試用例的質量和代碼的健壯性。常見的覆蓋率包括語句覆蓋率(Line Coverage)、分支覆蓋率(Bra ...
  • 前言 本文介紹瞭如何使用S7.NET庫實現對西門子PLC DB塊數據的讀寫,記錄了使用電腦模擬,模擬PLC,自至完成測試的詳細流程,並重點介紹了在這個過程中的易錯點,供參考。 用到的軟體: 1.Windows環境下鏈路層網路訪問的行業標準工具(WinPcap_4_1_3.exe)下載鏈接:http ...
  • 從依賴倒置原則(Dependency Inversion Principle, DIP)到控制反轉(Inversion of Control, IoC)再到依賴註入(Dependency Injection, DI)的演進過程,我們可以理解為一種逐步抽象和解耦的設計思想。這種思想在C#等面向對象的編 ...
  • 關於Python中的私有屬性和私有方法 Python對於類的成員沒有嚴格的訪問控制限制,這與其他面相對對象語言有區別。關於私有屬性和私有方法,有如下要點: 1、通常我們約定,兩個下劃線開頭的屬性是私有的(private)。其他為公共的(public); 2、類內部可以訪問私有屬性(方法); 3、類外 ...
  • C++ 訪問說明符 訪問說明符是 C++ 中控制類成員(屬性和方法)可訪問性的關鍵字。它們用於封裝類數據並保護其免受意外修改或濫用。 三種訪問說明符: public:允許從類外部的任何地方訪問成員。 private:僅允許在類內部訪問成員。 protected:允許在類內部及其派生類中訪問成員。 示 ...
  • 寫這個隨筆說一下C++的static_cast和dynamic_cast用在子類與父類的指針轉換時的一些事宜。首先,【static_cast,dynamic_cast】【父類指針,子類指針】,兩兩一組,共有4種組合:用 static_cast 父類轉子類、用 static_cast 子類轉父類、使用 ...
  • /******************************************************************************************************** * * * 設計雙向鏈表的介面 * * * * Copyright (c) 2023-2 ...
  • 相信接觸過spring做開發的小伙伴們一定使用過@ComponentScan註解 @ComponentScan("com.wangm.lifecycle") public class AppConfig { } @ComponentScan指定basePackage,將包下的類按照一定規則註冊成Be ...
  • 操作系統 :CentOS 7.6_x64 opensips版本: 2.4.9 python版本:2.7.5 python作為腳本語言,使用起來很方便,查了下opensips的文檔,支持使用python腳本寫邏輯代碼。今天整理下CentOS7環境下opensips2.4.9的python模塊筆記及使用 ...