本篇主要介紹了一種使用Rust語言編寫的查詢引擎——DataFusion,其使用了基於Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標 DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存儲數據的能力。但正因為不依賴底層存儲的格式,使其成為了一個靈活可擴展的 ...
本篇主要介紹了一種使用Rust語言編寫的查詢引擎——DataFusion,其使用了基於Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標
DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存儲數據的能力。但正因為不依賴底層存儲的格式,使其成為了一個靈活可擴展的查詢引擎。它原生支持了查詢CSV,Parquet,Avro,Json等存儲格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多種數據源。同時還提供了豐富的擴展介面,可以方便的讓我們接入自定義的數據格式和數據源。
DataFusion具有以下特性:
- 高性能:基於Rust,不用進行垃圾回收;基於Arrow記憶體模型,列式存儲,方便向量化計算
- 連接簡單:能夠與Arrow的其他生態互通
- 集成和定製簡單:可以擴展數據源,方法和運算元等
- 完全基於Rust編寫:高質量
基於DataFusion我們可以輕鬆構建高性能、高質量、可擴展的數據處理系統。
DBMS 與 Query Engine 的區別
DBMS: DataBase Management System
DBMS是一個包含完整資料庫管理特性的系統,主要包含以下幾個模塊:
- 存儲系統
- 元數據(Catalog)
- 查詢引擎(Query Engine)
- 訪問控制和許可權
- 資源管理
- 管理工具
- 客戶端
- 多節點管理
Query Engine
DataFusion是一種查詢引擎,查詢引擎屬於資料庫管理系統的一部分。查詢引擎是用戶與資料庫交互的主要介面,主要作用是將面向用戶的高階查詢語句翻譯成可被具體執行的數據處理單元操作,然後執行操作獲取數據。
DataFusion架構
架構詳情
DataFusion查詢引擎主要由以下幾部分構成:
- 前端
- 語法解析
- 語義分析
- Planner:語法樹轉換成邏輯計劃
主要涉及
DFParser
和SqlToRel
這兩個struct
- 查詢中間表示
- Expression(表達式)/ Type system(類型系統)
- Query Plan / Relational Operators(關係運算元)
- Rewrites / Optimizations(邏輯計劃優化)
主要涉及
LogicalPlan
和Expr
這兩個枚舉類
- 查詢底層表示
- Statistics(物理計划算子的統計信息,輔助物理計劃優化)
- Partitions(分塊,多線程執行物理計划算子)
- Sort orders(物理計划算子對數據是否排序)
- Algorithms(物理計划算子的執行演算法,如Hash join和Merge join)
- Rewrites / Optimizations(物理計劃優化)
主要涉及
PyhsicalPlanner
這個trait
實現的邏輯計划到物理計劃的轉換,其中主要的關鍵點是ExecutionPlan
和PhysicalExpr
- 執行運行時(運算元)
- 分配資源
- 向量化計算
主要涉及所有執行運算元,如
GroupedHashAggregateStream
擴展點
DataFusion查詢引擎的架構還是比較簡單的,其中的擴展點也非常清晰,我們可以從以下幾個方面對DataFusion進行擴展:
用戶自定義函數UDF
無狀態方法
/// 邏輯表達式枚舉類
pub enum Expr {
...
ScalarUDF {
/// The function
fun: Arc<ScalarUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
},
...
}
/// UDF的邏輯表達式
pub struct ScalarUDF {
/// 方法名
pub name: String,
/// 方法簽名
pub signature: Signature,
/// 返回值類型
pub return_type: ReturnTypeFunction,
/// 方法實現
pub fun: ScalarFunctionImplementation,
}
/// UDF的物理表達式
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
name: String,
/// 參數表達式列表
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
}
用戶自定義聚合函數UADF
有狀態方法
/// 邏輯表達式枚舉類
pub enum Expr {
...
AggregateUDF {
/// The function
fun: Arc<AggregateUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// Optional filter applied prior to aggregating
filter: Option<Box<Expr>>,
},
...
}
/// UADF的邏輯表達式
pub struct AggregateUDF {
/// 方法名
pub name: String,
/// 方法簽名
pub signature: Signature,
/// 返回值類型
pub return_type: ReturnTypeFunction,
/// 方法實現
pub accumulator: AccumulatorFunctionImplementation,
/// 需要保存的狀態的類型
pub state_type: StateTypeFunction,
}
/// UADF的物理表達式
pub struct AggregateFunctionExpr {
fun: AggregateUDF,
args: Vec<Arc<dyn PhysicalExpr>>,
data_type: DataType,
name: String,
}
用戶自定義優化規則
Optimizer
定義了承載優化規則的結構體,其中optimize
方法實現了邏輯計劃優化的過程。優化規則列表中的每個優化規則會被以TOP-DOWN
或BOTTOM-UP
方式作用於邏輯計劃樹,優化規則列表會被實施多個輪次。我們可以通過實現OptimizerRule
這個trait
來實現自己的優化邏輯。
pub struct Optimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
pub trait OptimizerRule {
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>>;
...
}
用戶自定義邏輯計划算子
/// 邏輯計划算子枚舉類
pub enum LogicalPlan {
...
Extension(Extension),
...
}
/// 自定義邏輯計划算子
pub struct Extension {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// 自定義邏輯計划算子需要實現的trait
pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }
用戶自定義物理計划算子
/// 為自定義的邏輯計划算子`UserDefinedLogcialNode`生成對應的物理計划算子
pub trait ExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// DataFusion預設的邏輯計划到物理計劃的轉換器提供了自定義轉換過程的結構體
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
/// 自定義物理計划算子需要實現的trait
pub trait ExecutionPlan: Debug + Send + Sync { ... }
用戶自定義數據源
可以看出,自定義數據源其實就是生成一個對應的ExecutionPlan執行計劃,這個執行計劃實施的是掃表的任務。如果數據源支持下推的能力,我們在這裡可以將projection
filters
limit
等操作下推到掃表時。
/// 自定義數據源需要實現的trait
pub trait TableProvider: Sync + Send {
...
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
...
}
用戶自定義元數據
pub trait CatalogProvider: Sync + Send {
...
/// 根據名稱獲取Schema
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
/// 註冊Schema
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
}
}
pub trait SchemaProvider: Sync + Send {
...
/// 根據表名獲取數據源
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
/// 註冊數據源
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
}
...
}
邏輯計劃(LogicalPlan)
邏輯計劃其實就是數據流圖,數據從葉子節點流向根節點
let df: DataFrame = ctx.read_table("http_api_requests_total")?
.filter(col("path").eq(lit("/api/v2/write")))?
.aggregate([col("status")]), [count(lit(1))])?;
這裡我們就使用DataFusion的API介面構造了一個數據流,首先read_table
節點會從數據源中掃描數據到記憶體中,然後經過filter
節點按照條件進行過濾,最後經過aggregate
節點進行聚合。數據流過最後的節點時,就生成了我們需要的數據。
上述鏈式調用的API介面實際上並沒有真正執行對數據的操作,這裡實際上是使用了建造者模式構造了邏輯計劃樹。最終生成的DataFrame
實際上只是包含了一下信息:
pub struct DataFrame {
/// 查詢上下文信息,包含了元數據,用戶註冊的UDF和UADF,使用的優化器,使用的planner等信息
session_state: SessionState,
/// 邏輯計劃樹的根節點
plan: LogicalPlan,
}
支持的邏輯計划算子
點擊查看代碼
Projection
Filter
Window
Aggregate
Sort
Join
TableScan
Repartition
Union
Subquery
Limit
Extension
Distinct
Values
Explain
Analyze
SetVariable
Prepare
Dml(...)
CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView
邏輯計劃優化
目標:確保結果相同的情況下,執行更快
初始的邏輯計劃,需要經過多個輪次的優化,才能生成執行效率更高的邏輯計劃。DataFusion本身的優化器內置了很多優化規則,用戶也可以擴展自己的優化規則。
內置優化輪次
-
下推(Pushdown):減少從一個節點到另一個節點的數據的行列數
PushDownProjection
PushDownFilter
PushDownLimit
-
簡化(Simplify):簡化表達式,減少運行時的運算。例如使用布爾代數的法則,將
b > 2 AND b > 2
簡化成b > 2
。SimplifyExpressions
UnwrapCastInComparison
-
簡化(Simplify):刪除無用的節點
-
平鋪子查詢(Flatten Subqueries):將子查詢用join重寫
DecorrelateWhereExists
DecorrelatedWhereIn
ScalarSubqueryToJoin
-
優化join:識別join謂詞
ExtractEqualJoinPredicate
RewriteDisjunctivePredicate
FilterNullJoinKeys
-
優化distinct
SingleDistinctToGroupBy
ReplaceDistinctWithAggregate
表達式運算(Expression Evaluation)
假設現在有這樣一個謂詞表達式
path = '/api/v2/write' or path is null
經過語法解析和轉換後,可以用如下表達式樹表示:
DataFusion在實施表達式運算時,使用了Arrow提供的向量化計算方法來加速運算
物理計劃(ExecutionPlan)
調用DataFusion提供的DefaultPhysicalPlanner
中的create_physical_plan
方法,可以將邏輯計劃樹轉換成物理計劃樹。其中物理計劃樹中的每個節點都是一個ExecutionPlan
。執行物理計劃樹時,會從根節點開始調用execute
方法,調用該方法還沒有執行對數據的操作,僅僅是將每個物理計划算子轉換成一個RecordBatchStream
運算元,形成數據流運算元樹。這些RecordBatchStream
運算元都實現了future
包提供的Stream
特性,當我們最終調用RecordBatchStream
的collect
方法時,才會從根節點開始poll
一次來獲取一下輪要處理的數據,根節點的poll
方法內會調用子節點的poll
方法,最終每poll
一次,整棵樹都會進行一次數據從葉子節點到根節點的流動,生成一個RecordBatch
。
DataFusion實現的物理計划算子具有以下特性:
- 非同步:避免了阻塞I/O
- 流式:數據是流式處理的
- 向量化:每次可以向量化地處理一個
RecordBatch
- 分片:每個運算元都可以並行,可以產生多個分片
- 多核
結語
DataFusion本身只是一個簡單,高效,可擴展的查詢引擎框架,用戶可以將DataFusion作為開發大型數據中台的基礎組件,也可以輕易地將DataFusion嵌入服務中作為查詢引擎,也可以使用DataFusion構建自己的資料庫系統。如果期望使用分散式的查詢引擎,可以關註基於Arrow
和DataFusion
搭建的分散式查詢引擎Ballista。