1、Spark Catalyst擴展點 Spark catalyst的擴展點在SPARK-18127中被引入,Spark用戶可以在SQL處理的各個階段擴展自定義實現,非常強大高效,是SparkSQL的核心組件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行 ...
1、Spark Catalyst擴展點
Spark catalyst的擴展點在SPARK-18127中被引入,Spark用戶可以在SQL處理的各個階段擴展自定義實現,非常強大高效,是SparkSQL的核心組件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行的性能。Catalyst Optimizer是SparkSQL的核心組件(查詢優化器),它負責將SQL語句轉換成物理執行計劃,Catalyst的優劣決定了SQL執行的性能。查詢優化器是一個SQL引擎的核心,開源常用的有Apache Calcite(很多開源組件都通過引入Calcite來實現查詢優化,如Hive/Phoenix/Drill等),另外一個是orca(HAWQ/GreenPlum中使用)。
2、SparkSessionExtensions
SparkSessionExtensions保存了所有用戶自定義的擴展規則,自定義規則保存在成員變數中,對於不同階段的自定義規則,SparkSessionExtensions提供了不同的介面。
api文檔地址:https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSessionExtensions.html
SparkSessionExtensions classSparkSessionExtensions extends AnyRef ExperimentalDeveloper API Holder for injection points to the SparkSession. We make NO guarantee about the stability regarding binary compatibility and source compatibility of methods here. This current provides the following extension points: Analyzer Rules. Check Analysis Rules. Optimizer Rules. Pre CBO Rules. Planning Strategies. Customized Parser. (External) Catalog listeners. Columnar Rules. Adaptive Query Stage Preparation Rules. The extensions can be used by calling withExtensions on the SparkSession.Builder, for example: SparkSession.builder() .master("...") .config("...", true) .withExtensions { extensions => extensions.injectResolutionRule { session => ... } extensions.injectParser { (session, parser) => ... } } .getOrCreate() The extensions can also be used by setting the Spark SQL configuration property spark.sql.extensions. Multiple extensions can be set using a comma-separated list. For example: SparkSession.builder() .master("...") .config("spark.sql.extensions", "org.example.MyExtensions,org.example.YourExtensions") .getOrCreate() class MyExtensions extends Function1[SparkSessionExtensions, Unit] { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectResolutionRule { session => ... } extensions.injectParser { (session, parser) => ... } } } class YourExtensions extends SparkSessionExtensionsProvider { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectResolutionRule { session => ... } extensions.injectFunction(...) } } Note that none of the injected builders should assume that the SparkSession is fully initialized and should not touch the session's internals (e.g. the SessionState). Annotations @DeveloperApi() @Experimental() @Unstable() Source SparkSessionExtensions.scala Filter all members Instance Constructors newSparkSessionExtensions() Type Members typeCheckRuleBuilder = (SparkSession) ⇒ (LogicalPlan) ⇒ Unit typeColumnarRuleBuilder = (SparkSession) ⇒ ColumnarRule typeFunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder) typeParserBuilder = (SparkSession, ParserInterface) ⇒ ParserInterface typeQueryStagePrepRuleBuilder = (SparkSession) ⇒ Rule[SparkPlan] typeRuleBuilder = (SparkSession) ⇒ Rule[LogicalPlan] typeStrategyBuilder = (SparkSession) ⇒ Strategy typeTableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder) Value Members definjectCheckRule(builder: CheckRuleBuilder): Unit Inject an check analysis Rule builder into the SparkSession. definjectColumnar(builder: ColumnarRuleBuilder): Unit Inject a rule that can override the columnar execution of an executor. definjectFunction(functionDescription: FunctionDescription): Unit Injects a custom function into the org.apache.spark.sql.catalyst.analysis.FunctionRegistry at runtime for all sessions. definjectOptimizerRule(builder: RuleBuilder): Unit Inject an optimizer Rule builder into the SparkSession. definjectParser(builder: ParserBuilder): Unit Inject a custom parser into the SparkSession. definjectPlannerStrategy(builder: StrategyBuilder): Unit Inject a planner Strategy builder into the SparkSession. definjectPostHocResolutionRule(builder: RuleBuilder): Unit Inject an analyzer Rule builder into the SparkSession. definjectPreCBORule(builder: RuleBuilder): Unit Inject an optimizer Rule builder that rewrites logical plans into the SparkSession. definjectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit Inject a rule that can override the query stage preparation phase of adaptive query execution. definjectResolutionRule(builder: RuleBuilder): Unit Inject an analyzer resolution Rule builder into the SparkSession. definjectTableFunction(functionDescription: TableFunctionDescription): Unit Injects a custom function into the org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry at runtime for all sessions.
轉載請註明張永清 博客園:https://www.cnblogs.com/laoqing/p/16351482.html
2.1 新增自定義規則
用戶可以通過SparkSessionExtensions提供的inject開頭的方法添加新的自定義規則,具體的inject介面如下:
- injectOptimizerRule – 添加optimizer自定義規則,optimizer負責邏輯執行計劃的優化。
- injectParser – 添加parser自定義規則,parser負責SQL解析。
- injectPlannerStrategy – 添加planner strategy自定義規則,planner負責物理執行計劃的生成。
- injectResolutionRule – 添加Analyzer自定義規則到Resolution階段,analyzer負責邏輯執行計劃生成。
- injectPostHocResolutionRule – 添加Analyzer自定義規則到Post Resolution階段。
- injectCheckRule – 添加Analyzer自定義Check規則。
Spark Catalyst的SQL處理分成parser,analyzer,optimizer以及planner等多個步驟,其中analyzer,optimizer等步驟內部也分為多個階段,以Analyzer為例,analyse規則切分到不同的batch中,每個batch的執行策略可能不盡相同,有的只會執行一遍,有的會迭代執行直到滿足一定條件。
2.2 獲取自定義規則
SparkSessionExtensions對應每一種自定義規則也都有一個build開頭的方法用於獲取對應類型的自定義規則,Spark session在初始化的時候,通過這些方法獲取自定義規則並傳遞給parser,analyzer,optimizer以及planner等對象。
- buildOptimizerRules
- buildParser
- buildPlannerStrategies
- buildResolutionRules
- buildPostHocResolutionRules
- buildCheckRules
2.3 配置自定義規則
在Spark中,用戶自定義的規則可以通過兩種方式配置生效:
- 使用SparkSession.Builder中的withExtenstion方法,withExtension方法是一個高階函數,接收一個自定義函數作為參數,這個自定義函數以SparkSessionExtensions作為參數,用戶可以實現這個函數,通過SparkSessionExtensions的inject開頭的方法添加用戶自定義規則。
- 通過Spark配置參數,具體參數名為spark.sql.extensions。用戶可以將1中的自定義函數實現定義為一個類,將完整類名作為參數值。
具體的用法用戶可以參考org.apache.spark.sql.SparkSessionExtensionSuite測試用例中的Spark代碼。
3、擴展Spark Catalyst
3.1 通過listenerBus發送sql event
package org.apache.spark.sql.execution //轉載請註明張永清 博客園:https://www.cnblogs.com/laoqing/p/16351482.html import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession case class SqlEvent(sqlText: String, sparkSession: SparkSession) extends org.apache.spark.scheduler.SparkListenerEvent with Logging class MySqlParser(sparkSession: SparkSession, val delegate : org.apache.spark.sql.catalyst.parser.ParserInterface) extends scala.AnyRef with org.apache.spark.sql.catalyst.parser.ParserInterface with Logging{ override def parsePlan(sqlText: String): LogicalPlan = { logInfo("start to send SqlEvent by listenerBus,sqlText:"+sqlText) sparkSession.sparkContext.listenerBus.post( SqlEvent(sqlText,sparkSession)) logInfo("send SqlEvent success by listenerBus,sqlText:"+sqlText) delegate.parsePlan(sqlText) } override def parseExpression(sqlText: String): Expression = { delegate.parseExpression(sqlText) } override def parseTableIdentifier(sqlText: String): TableIdentifier = { delegate.parseTableIdentifier(sqlText) } override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { delegate.parseFunctionIdentifier(sqlText) } override def parseTableSchema(sqlText: String): StructType = { delegate.parseTableSchema(sqlText) } override def parseDataType(sqlText: String): DataType = { delegate.parseDataType(sqlText) } }
import org.apache.spark.sql.SparkSessionExtensions class MySparkSessionExtension extends ((SparkSessionExtensions) => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (session, parser) => new MySqlParser(session, parser) } } }
SparkSession.builder() .master("...") .config("spark.sql.extensions", "MySparkSessionExtension") .getOrCreate()
3.2創建一個自定義Parser
class StrictParser(parser: ParserInterface) extends ParserInterface { /** * Parse a string to a [[LogicalPlan]]. */ override def parsePlan(sqlText: String): LogicalPlan = { val logicalPlan = parser.parsePlan(sqlText) logicalPlan transform { case project @ Project(projectList, _) => projectList.foreach { name => if (name.isInstanceOf[UnresolvedStar]) { throw new RuntimeException("You must specify your project column set," + " * is not allowed.") } } project } logicalPlan } /** * Parse a string to an [[Expression]]. */ override def parseExpression(sqlText: String): Expression = parser.parseExpression(sqlText) /** * Parse a string to a [[TableIdentifier]]. */ override def parseTableIdentifier(sqlText: String): TableIdentifier = parser.parseTableIdentifier(sqlText) /** * Parse a string to a [[FunctionIdentifier]]. */ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = parser.parseFunctionIdentifier(sqlText) /** * Parse a string to a [[StructType]]. The passed SQL string should be a comma separated * list of field definitions which will preserve the correct Hive metadata. */ override def parseTableSchema(sqlText: String): StructType = parser.parseTableSchema(sqlText) /** * Parse a string to a [[DataType]]. */ override def parseDataType(sqlText: String): DataType = parser.parseDataType(sqlText) }
創建擴展點函數
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface type ExtensionsBuilder = SparkSessionExtensions => Unit val parserBuilder: ParserBuilder = (_, parser) => new StrictParser(parser) val extBuilder: ExtensionsBuilder = { e => e.injectParser(parserBuilder)}
extBuilder函數用於SparkSession構建,SparkSessionExtensions.injectParser函數本身也是一個高階函數,接收parserBuilder作為參數,將原生parser作為參數傳遞給自定義的StrictParser,並將StrictParser作為自定義parser插入SparkSessionExtensions中。
在SparkSession中啟用自定義Parser
val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.master", "local[2]") .withExtensions(extBuilder) .getOrCreate()
在Spark2.2版本中,引入了新的擴展點,使得用戶可以在Spark session中自定義自己的parser,analyzer,optimizer以及physical planning stragegy rule。通過兩個簡單的示例,我們展示瞭如何通過Spark提供的擴展點實現SQL檢查以及定製化的執行計劃優化。Spark Catalyst高度的可擴展性使得我們可以非常方便的定製適合自己實際使用場景的SQL引擎,拓展了更多的可能性。我們可以實現特定的SQL方言,針對特殊的數據源做更深入的優化,進行SQL規範檢查,針對特定執行環境制定特定的優化策略等等。
作者的原創文章,轉載須註明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對於轉載了博主的原創文章,不標註出處的,作者將依法追究版權,請尊重作者的成果。