如何擴展Spark Catalyst,抓取spark sql 語句,通過listenerBus發送sql event以及編寫自定義的Spark SQL引擎

来源:https://www.cnblogs.com/laoqing/archive/2022/06/07/16351482.html
-Advertisement-
Play Games

1、Spark Catalyst擴展點 Spark catalyst的擴展點在SPARK-18127中被引入,Spark用戶可以在SQL處理的各個階段擴展自定義實現,非常強大高效,是SparkSQL的核心組件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行 ...


1、Spark Catalyst擴展點

Spark catalyst的擴展點在SPARK-18127中被引入,Spark用戶可以在SQL處理的各個階段擴展自定義實現,非常強大高效,是SparkSQL的核心組件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行的性能。Catalyst Optimizer是SparkSQL的核心組件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行的性能。查詢優化器是一個SQL引擎的核心,開源常用的有Apache Calcite(很多開源組件都通過引入Calcite來實現查詢優化,如Hive/Phoenix/Drill等),另外一個是orca(HAWQ/GreenPlum中使用)。

2、SparkSessionExtensions

SparkSessionExtensions保存了所有用戶自定義的擴展規則,自定義規則保存在成員變數中,對於不同階段的自定義規則,SparkSessionExtensions提供了不同的介面。
api文檔地址:https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSessionExtensions.html

SparkSessionExtensions
classSparkSessionExtensions extends AnyRef
ExperimentalDeveloper API
Holder for injection points to the SparkSession. We make NO guarantee about the stability regarding binary compatibility and source compatibility of methods here.

This current provides the following extension points:

Analyzer Rules.
Check Analysis Rules.
Optimizer Rules.
Pre CBO Rules.
Planning Strategies.
Customized Parser.
(External) Catalog listeners.
Columnar Rules.
Adaptive Query Stage Preparation Rules.
The extensions can be used by calling withExtensions on the SparkSession.Builder, for example:

SparkSession.builder()
  .master("...")
  .config("...", true)
  .withExtensions { extensions =>
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectParser { (session, parser) =>
      ...
    }
  }
  .getOrCreate()
The extensions can also be used by setting the Spark SQL configuration property spark.sql.extensions. Multiple extensions can be set using a comma-separated list. For example:

SparkSession.builder()
  .master("...")
  .config("spark.sql.extensions", "org.example.MyExtensions,org.example.YourExtensions")
  .getOrCreate()

class MyExtensions extends Function1[SparkSessionExtensions, Unit] {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectParser { (session, parser) =>
      ...
    }
  }
}

class YourExtensions extends SparkSessionExtensionsProvider {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectFunction(...)
  }
}
Note that none of the injected builders should assume that the SparkSession is fully initialized and should not touch the session's internals (e.g. the SessionState).

Annotations
@DeveloperApi() @Experimental() @Unstable()
Source
SparkSessionExtensions.scala

Filter all members
Instance Constructors
newSparkSessionExtensions()
Type Members
typeCheckRuleBuilder = (SparkSession) ⇒ (LogicalPlan) ⇒ Unit
typeColumnarRuleBuilder = (SparkSession) ⇒ ColumnarRule
typeFunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder)
typeParserBuilder = (SparkSession, ParserInterface) ⇒ ParserInterface
typeQueryStagePrepRuleBuilder = (SparkSession) ⇒ Rule[SparkPlan]
typeRuleBuilder = (SparkSession) ⇒ Rule[LogicalPlan]
typeStrategyBuilder = (SparkSession) ⇒ Strategy
typeTableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
Value Members
definjectCheckRule(builder: CheckRuleBuilder): Unit
Inject an check analysis Rule builder into the SparkSession.

definjectColumnar(builder: ColumnarRuleBuilder): Unit
Inject a rule that can override the columnar execution of an executor.

definjectFunction(functionDescription: FunctionDescription): Unit
Injects a custom function into the org.apache.spark.sql.catalyst.analysis.FunctionRegistry at runtime for all sessions.

definjectOptimizerRule(builder: RuleBuilder): Unit
Inject an optimizer Rule builder into the SparkSession.

definjectParser(builder: ParserBuilder): Unit
Inject a custom parser into the SparkSession.

definjectPlannerStrategy(builder: StrategyBuilder): Unit
Inject a planner Strategy builder into the SparkSession.

definjectPostHocResolutionRule(builder: RuleBuilder): Unit
Inject an analyzer Rule builder into the SparkSession.

definjectPreCBORule(builder: RuleBuilder): Unit
Inject an optimizer Rule builder that rewrites logical plans into the SparkSession.

definjectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit
Inject a rule that can override the query stage preparation phase of adaptive query execution.

definjectResolutionRule(builder: RuleBuilder): Unit
Inject an analyzer resolution Rule builder into the SparkSession.

definjectTableFunction(functionDescription: TableFunctionDescription): Unit
Injects a custom function into the org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry at runtime for all sessions.

轉載請註明張永清 博客園:https://www.cnblogs.com/laoqing/p/16351482.html

2.1 新增自定義規則

用戶可以通過SparkSessionExtensions提供的inject開頭的方法添加新的自定義規則,具體的inject介面如下:

  • injectOptimizerRule – 添加optimizer自定義規則,optimizer負責邏輯執行計劃的優化。
  • injectParser – 添加parser自定義規則,parser負責SQL解析。
  • injectPlannerStrategy – 添加planner strategy自定義規則,planner負責物理執行計劃的生成。
  • injectResolutionRule – 添加Analyzer自定義規則到Resolution階段,analyzer負責邏輯執行計劃生成。
  • injectPostHocResolutionRule – 添加Analyzer自定義規則到Post Resolution階段。
  • injectCheckRule – 添加Analyzer自定義Check規則。

Spark Catalyst的SQL處理分成parser,analyzer,optimizer以及planner等多個步驟,其中analyzer,optimizer等步驟內部也分為多個階段,以Analyzer為例,analyse規則切分到不同的batch中,每個batch的執行策略可能不盡相同,有的只會執行一遍,有的會迭代執行直到滿足一定條件。

2.2 獲取自定義規則

SparkSessionExtensions對應每一種自定義規則也都有一個build開頭的方法用於獲取對應類型的自定義規則,Spark session在初始化的時候,通過這些方法獲取自定義規則並傳遞給parser,analyzer,optimizer以及planner等對象。

  • buildOptimizerRules
  • buildParser
  • buildPlannerStrategies
  • buildResolutionRules
  • buildPostHocResolutionRules
  • buildCheckRules

2.3 配置自定義規則

在Spark中,用戶自定義的規則可以通過兩種方式配置生效:

  • 使用SparkSession.Builder中的withExtenstion方法,withExtension方法是一個高階函數,接收一個自定義函數作為參數,這個自定義函數以SparkSessionExtensions作為參數,用戶可以實現這個函數,通過SparkSessionExtensions的inject開頭的方法添加用戶自定義規則。
  • 通過Spark配置參數,具體參數名為spark.sql.extensions。用戶可以將1中的自定義函數實現定義為一個類,將完整類名作為參數值。

具體的用法用戶可以參考org.apache.spark.sql.SparkSessionExtensionSuite測試用例中的Spark代碼。

3、擴展Spark Catalyst

3.1 通過listenerBus發送sql event

package org.apache.spark.sql.execution
//轉載請註明張永清 博客園:https://www.cnblogs.com/laoqing/p/16351482.html
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

case class SqlEvent(sqlText: String, sparkSession: SparkSession) extends org.apache.spark.scheduler.SparkListenerEvent with Logging

class MySqlParser(sparkSession: SparkSession, val delegate : org.apache.spark.sql.catalyst.parser.ParserInterface) extends scala.AnyRef with org.apache.spark.sql.catalyst.parser.ParserInterface with Logging{
  override def parsePlan(sqlText: String): LogicalPlan = {
    logInfo("start to send SqlEvent by listenerBus,sqlText:"+sqlText)
    sparkSession.sparkContext.listenerBus.post( SqlEvent(sqlText,sparkSession))
    logInfo("send SqlEvent success by listenerBus,sqlText:"+sqlText)
    delegate.parsePlan(sqlText)
  }
  
    override def parseExpression(sqlText: String): Expression = {
    delegate.parseExpression(sqlText)
  }

  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
    delegate.parseTableIdentifier(sqlText)

  }

  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
    delegate.parseFunctionIdentifier(sqlText)

  }



  override def parseTableSchema(sqlText: String): StructType = {
    delegate.parseTableSchema(sqlText)

  }

  override def parseDataType(sqlText: String): DataType = {
    delegate.parseDataType(sqlText)
  }

}
import org.apache.spark.sql.SparkSessionExtensions


class MySparkSessionExtension  extends ((SparkSessionExtensions) => Unit) {

  override def apply(extensions: SparkSessionExtensions): Unit = {

    extensions.injectParser { (session, parser) =>

      new MySqlParser(session, parser)
    }


  }


}
SparkSession.builder()
  .master("...")
  .config("spark.sql.extensions", "MySparkSessionExtension") .getOrCreate()

  

3.2創建一個自定義Parser

class StrictParser(parser: ParserInterface) extends ParserInterface {
  /**
   * Parse a string to a [[LogicalPlan]].
   */
  override def parsePlan(sqlText: String): LogicalPlan = {
    val logicalPlan = parser.parsePlan(sqlText)
    logicalPlan transform {
      case project @ Project(projectList, _) =>
        projectList.foreach {
          name =>
            if (name.isInstanceOf[UnresolvedStar]) {
              throw new RuntimeException("You must specify your project column set," +
                " * is not allowed.")
            }
        }
        project
    }
    logicalPlan
  }
 
  /**
   * Parse a string to an [[Expression]].
   */
  override def parseExpression(sqlText: String): Expression = parser.parseExpression(sqlText)
 
  /**
   * Parse a string to a [[TableIdentifier]].
   */
  override def parseTableIdentifier(sqlText: String): TableIdentifier =
    parser.parseTableIdentifier(sqlText)
 
  /**
   * Parse a string to a [[FunctionIdentifier]].
   */
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
    parser.parseFunctionIdentifier(sqlText)
 
  /**
   * Parse a string to a [[StructType]]. The passed SQL string should be a comma separated
   * list of field definitions which will preserve the correct Hive metadata.
   */
  override def parseTableSchema(sqlText: String): StructType =
    parser.parseTableSchema(sqlText)
 
  /**
   * Parse a string to a [[DataType]].
   */
  override def parseDataType(sqlText: String): DataType = parser.parseDataType(sqlText)
}

創建擴展點函數

type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
type ExtensionsBuilder = SparkSessionExtensions => Unit
val parserBuilder: ParserBuilder = (_, parser) => new StrictParser(parser)
val extBuilder: ExtensionsBuilder = { e => e.injectParser(parserBuilder)}

extBuilder函數用於SparkSession構建,SparkSessionExtensions.injectParser函數本身也是一個高階函數,接收parserBuilder作為參數,將原生parser作為參數傳遞給自定義的StrictParser,並將StrictParser作為自定義parser插入SparkSessionExtensions中。

在SparkSession中啟用自定義Parser

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.master", "local[2]")
  .withExtensions(extBuilder)
  .getOrCreate()

在Spark2.2版本中,引入了新的擴展點,使得用戶可以在Spark session中自定義自己的parser,analyzer,optimizer以及physical planning stragegy rule。通過兩個簡單的示例,我們展示瞭如何通過Spark提供的擴展點實現SQL檢查以及定製化的執行計劃優化。Spark Catalyst高度的可擴展性使得我們可以非常方便的定製適合自己實際使用場景的SQL引擎,拓展了更多的可能性。我們可以實現特定的SQL方言,針對特殊的數據源做更深入的優化,進行SQL規範檢查,針對特定執行環境制定特定的優化策略等等。

 

作者的原創文章,轉載須註明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對於轉載了博主的原創文章,不標註出處的,作者將依法追究版權,請尊重作者的成果。
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 運算符重載 運算符重載基礎 函數重載(Function Overloading)可以讓一個函數名有多種功能,在不同情況下進行不同的操作。**運算符重載(Operator Overloading)**也是一個道理,同一個運算符可以有不同的功能。 例子:用+號實現複數加法運算;成員函數重載運算符 #in ...
  • Python文本分析“王心凌”彈幕演示案例,包含步驟:爬蟲+情感判定+情感占比餅圖+Top10高頻詞+詞雲圖。 ...
  • .NET CORE webapi 調用阿裡雲簡訊服務 1.獲取AccessKey 您可以為阿裡雲賬號(主賬號)和RAM用戶創建一個訪問密鑰(AccessKey)。在調用阿裡雲API時您需要使用AccessKey完成身份驗證。 背景信息 AccessKey包括AccessKey ID和AccessKe ...
  • AAAA AAAA即認證、授權、審計、賬號(Authentication、Authorization、Audit、Account)。在安全領域我們繞不開的兩個問題: 授權過程可靠:讓第三方程式能夠訪問所需資源又不泄露用戶數據,常用的多方授權協議主要有 OAuth2 和 SAML 2.0 授權結果可控 ...
  • 1.前言 hi,大家好,我是三合,距離上一篇博客已經過去了整整兩年,這兩年裡,博主通關了《人生》這個游戲里的兩大關卡,買房和結婚。最近閑了下來,那麼當然要繼續寫博客了,今天這篇博客的主要內容是,net core/.net6中,如何利用SummerBoot(點我打開詳情介紹)中的feign模塊快速接入 ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 在CentOS中使用yum命令出現報錯: 今天想給linux裝個git 管理代碼 執行命令: 1 yum -y install git 然後出現了這麼一句 error:There are no enabled repositories in "/ ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 一、鏡像 鏡像是一種輕量級、可執行的獨立軟體包,它包含運行某個軟體所需的所有內容,我們把應用程式和配置依賴打包形成一個可交付的運行環境(包括代碼、運行時需要的庫、環境變數和配置文件等),這個打包好的運行環境就是image鏡像文件 1.鏡像分層 以 ...
  • 一、基本說明 • Oracle 中的函數可以返回表類型,但是這個表類型實際上是集合類型(與數組類似)。從 Oracle 9i 開始,提供了一個叫做"管道化表函數"來解決此問題。 • 管道化表函數,必須返回一個集合類型,且標明 pipelined。它不能返回具體變數,必須以一個空 return 返回, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...