Spark SQL源碼剖析(一)SQL解析框架Catalyst流程概述

来源:https://www.cnblogs.com/listenfwind/archive/2020/04/21/12724381.html
-Advertisement-
Play Games

Spark SQL模塊,主要就是處理跟SQL解析相關的一些內容,說得更通俗點就是怎麼把一個SQL語句解析成Dataframe或者說RDD的任務。以Spark 2.4.3為例,Spark SQL這個大模塊分為三個子模塊,如下圖所示 其中Catalyst可以說是Spark內部專門用來解析SQL的一個框架 ...


Spark SQL模塊,主要就是處理跟SQL解析相關的一些內容,說得更通俗點就是怎麼把一個SQL語句解析成Dataframe或者說RDD的任務。以Spark 2.4.3為例,Spark SQL這個大模塊分為三個子模塊,如下圖所示

spark sql模塊

其中Catalyst可以說是Spark內部專門用來解析SQL的一個框架,在Hive中類似的框架是Calcite(將SQL解析成MapReduce任務)。Catalyst將SQL解析任務分成好幾個階段,這個在對應的論文中講述得比較清楚,本系列很多內容也會參考論文,有興趣閱讀原論文的可以到這裡看:Spark SQL: Relational Data Processing in Spark

而Core模塊其實就是Spark SQL主要解析的流程,當然這個過程中會去調用Catalyst的一些內容。這模塊裡面比較常用的類包括SparkSession,DataSet等。

至於hive模塊,這個不用說,肯定跟hive有關的。這個模塊在本系列基本不會涉及到,就不多介紹了。

值得一提的是,論文發表的時候還是在Spark1.x階段,那個時候SQL解析成詞法樹用的是scala寫的一個解析工具,到2.x階段改為使用antlr4來做這部分工作(這應該算是最大的改變)。至於為什麼要改,我猜是出於可讀性和易用性方面的考慮,當然這個僅是個人猜測。

另外,這一系列會簡單介紹一條SQL語句的處理流程,基於spark 2.4.3(sql這個模塊在spark2.1後變化不大)。這一篇先從整體介紹Spark SQL出現的背景及解決問題,Dataframe API以及Catalyst的流程大概是怎麼樣,後面分階段細說Catalyst的流程。

Spark SQL出現的背景及解決的問題

在最早的時候,大規模處理數據的技術是MapReduce,但這種框架執行效率太慢,進行一些關係型處理(如join)需要編寫大量代碼。後來hive這種框架可以讓用戶輸入sql語句,自動進行優化並執行。

但在大型系統中,任然有兩個主要問題,一個是ETL操作需要對接多個數據源。另一個是用戶需要執行複雜分析,比如機器學習和圖計算等。但傳統的關係型處理系統中較難實現。

Spark SQL提供了兩個子模塊來解決這個問題,DataFrame API和Catalyst

相比於RDD,Dataframe api提供了更加豐富的關係型api,並且能和RDD相互轉換,後面Spark機器學習方面的工作重心,也從以RDD為基礎的mllib轉移到以Dataframe為基礎的Spark ML(雖然Dataframe底層也是RDD)。

另一個就是Catalyst,通過它可以輕鬆為諸如機器學習之類的域添加數據源(比如json或通過case class自定義的類型),優化規則和數據類型。

通過這兩個模塊,Spark SQL主要實現以下目標:

  • 提供方便易用好的API,包括讀取外部數據源,以及關係數據處理(用過的都知道)
  • 使用已建立的DBMS技術提供高性能。
  • 輕鬆支持新數據源,包括半結構化數據和外部資料庫(比如MYSQL)。
  • 圖計算和機器學習方面的拓展

那下麵就介紹Dataframe和Catalyst的流程,當然主要討論的還是Catalyst。

統一API Dataframe

先來看看論文裡面提供的一張圖:

Dataframe

這張圖可以說明很多,首先Spark的Dataframe API底層也是基於Spark的RDD。但與RDD不同的在於,Dataframe會持有schema(這個實在不好翻譯,可以理解為數據的結構吧),以及可以執行各種各樣的關係型操作,比如Select,Filter,Join,Groupby等。從操作上來說,和pandas的Dataframe有點像(連名字都是一樣的)。

同時因為是基於RDD的,所以很多RDD的特性Dataframe都能夠享受到,比如說分散式計算中一致性,可靠性方面的保證,以及可以通過cache緩存數據,提高計算性能啊等等。

同時圖中頁展示了Dataframe可以通過JDBC鏈接外部資料庫,通過控制台操作(spark-shell),或者用戶程式。說白了,就是Dataframe可以通過RDD轉換而來,也可以通過外部數據表生成

對了,這裡順便說一句,很多初次接觸Spark SQL的童鞋可能會對Dataset和Dataframe這兩個東西感到疑惑,在1.x時代它們確實有些差別,不過在spark2.x的時候,這兩個API已經統一了。所以基本上Dataset和Dataframe可以看成是等價的東西

最後還是結合代碼做一下實際的展示吧,如下展示生成一個RDD,並且根據這個RDD生成對應的Dataframe,從中可以看出RDD和Dataframe的區別:

//生成RDD
scala> val data = sc.parallelize(Array((1,2),(3,4)))
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> data.foreach(println)
(1,2)
(3,4)

scala> val df = data.toDF("fir","sec")
df: org.apache.spark.sql.DataFrame = [fir: int, sec: int]

scala> df.show()
+---+---+
|fir|sec|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

//跟RDD相比,多了schema
scala> df.printSchema()
root
 |-- fir: integer (nullable = false)
 |-- sec: integer (nullable = false)

Catalyst流程解析

Catalyst在論文中被叫做優化器(Optimizer),這部分是論文裡面較為核心的內容,不過其實流程還是蠻好理解的,依舊貼下論文裡面的圖。

catalyst流程

主要流程大概可以分為以下幾步:

  1. Sql語句經過Antlr4解析,生成Unresolved Logical Plan(有使用過Antlr4的童鞋肯定對這一過程不陌生)
  2. analyzer與catalog進行綁定(catlog存儲元數據),生成Logical Plan;
  3. optimizer對Logical Plan優化,生成Optimized LogicalPlan;
  4. SparkPlan將Optimized LogicalPlan轉換成 Physical Plan;
  5. prepareForExecution()將 Physical Plan 轉換成 executed Physical Plan;
  6. execute()執行可執行物理計劃,得到RDD;

提前說一下吧,上述流程多數是在org.apache.spark.sql.execution.QueryExecution這個類裡面,這個貼一下簡單的代碼,看看就好,先不多做深究。後面的文章會詳細介紹這裡的內容。

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {

  ......其他代碼
  
  //analyzer階段
  lazy val analyzed: LogicalPlan = {
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.executeAndCheck(logical)
  }


  //optimizer階段
  lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
  
  //SparkPlan階段
  lazy val sparkPlan: SparkPlan = {
    SparkSession.setActiveSession(sparkSession)
    // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
    //       but we will implement to choose the best plan.
    planner.plan(ReturnAnswer(optimizedPlan)).next()
  }

  //prepareForExecution階段
  // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

  //execute階段
  /** Internal version of the RDD. Avoids copies and has no schema */
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

  ......其他代碼
}

值得一提的是每個階段都使用了lazy懶載入,對這塊感興趣可以看看我之前的文章Scala函數式編程(六) 懶載入與Stream

上述主要介紹Spark SQL模塊內容,其出現的背景以及主要解決問題。而後簡單介紹下Dataframe API的內容,以及Spark SQL解析SQL的內部框架Catalyst。後續主要會介紹Catalyst中各個步驟的流程,結合源碼來做一些分析。

以上~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 有很多朋友問我學習了Python後,有沒有什麼好的項目可以練手。 其實,做項目主要還是根據需求來的。但是對於一個初學者來說,很多複雜的項目沒辦法獨立完成,因此博主挑選了一個非常適合初學者的項目,內容不是很複雜,但是非常有趣,我相信對於初學者小白來說是再好不過的項目了。 這個項目中,我們將要建立一個比 ...
  • 1 #include <iostream> 2 #include <cstring> 3 4 using namespace std; 5 6 class Person 7 { 8 private: 9 char Name[10]; 10 char Sex; 11 int Age; 12 publi ...
  • 在使用 SpringMVC 框架的過程中,如果前臺有包含中文的請求,或者後臺有包含中文的響應,有可能會出現亂碼的情況。在以前的 Servlet 中,我們使用 和 來設置請求和響應的編碼。在 SpringMVC 中,框架直接給我們提供了一個用來解決請求和響應的亂碼問題的過濾器 。 註意:請將 Char ...
  • C++ 預設參數 預設參數是指當函數調用中省略了實參時自動使用的一個值。 對於帶參數列表的函數,必須從右向左添加預設值。 實參按從左到右的順序一次被賦給相應的形參,而不能跳過任何參數。 註意:只有原型指定了預設值,函數定義與沒有預設參數時完全相同。 ...
  • Laravel 團隊昨天發佈了 v7.6.0,其中包含 13 個新功能以及 7.x 分支的最新修複和更改: 集合新增 “until” 方法 Jason McCreary 貢獻了 Collection::until() 方法, 該方法可以迴圈遍歷集合直到元素滿足條件再將該元素返回: // Before ...
  • 如果我們把ES作為某種資料庫來使用的話,必須熟練掌握ES的CRUD操作。在這之前先更正一下上篇中關於檢查索引是否存在的方法:elastic4s的具體調用如下: //刪除索引 val rspExists = client.execute(indexExists("company")).await if ...
  • 我的LeetCode:https://leetcode cn.com/u/ituring/ 我的LeetCode刷題源碼[GitHub]:https://github.com/izhoujie/Algorithmcii LeetCode 560. 和為K的子數組 題目 給定一個整數數組和一個整數 k ...
  • 通俗理解spring源碼(四)—— 獲取Docment 上節講到了xmlBeanDefinitionReader.doLoadDocument(InputSource inputSource, Resource resource)方法: protected Document doLoadDocume ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...