Apache Arrow DataFusion原理與架構

来源:https://www.cnblogs.com/leometeor/archive/2023/05/15/17397333.html
-Advertisement-
Play Games

本篇主要介紹了一種使用Rust語言編寫的查詢引擎——DataFusion,其使用了基於Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標 DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存儲數據的能力。但正因為不依賴底層存儲的格式,使其成為了一個靈活可擴展的 ...


本篇主要介紹了一種使用Rust語言編寫的查詢引擎——DataFusion,其使用了基於Arrow格式的記憶體模型,結合Rust語言本身的優勢,達成了非常優秀的性能指標

DataFusion是一個查詢引擎而非資料庫,因此其本身不具備存儲數據的能力。但正因為不依賴底層存儲的格式,使其成為了一個靈活可擴展的查詢引擎。它原生支持了查詢CSV,Parquet,Avro,Json等存儲格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多種數據源。同時還提供了豐富的擴展介面,可以方便的讓我們接入自定義的數據格式和數據源。

DataFusion具有以下特性:

  • 高性能:基於Rust,不用進行垃圾回收;基於Arrow記憶體模型,列式存儲,方便向量化計算
  • 連接簡單:能夠與Arrow的其他生態互通
  • 集成和定製簡單:可以擴展數據源,方法和運算元等
  • 完全基於Rust編寫:高質量

基於DataFusion我們可以輕鬆構建高性能、高質量、可擴展的數據處理系統。

DBMS 與 Query Engine 的區別

DBMS: DataBase Management System

DBMS是一個包含完整資料庫管理特性的系統,主要包含以下幾個模塊:

  • 存儲系統
  • 元數據(Catalog)
  • 查詢引擎(Query Engine)
  • 訪問控制和許可權
  • 資源管理
  • 管理工具
  • 客戶端
  • 多節點管理

Query Engine

DataFusion是一種查詢引擎,查詢引擎屬於資料庫管理系統的一部分。查詢引擎是用戶與資料庫交互的主要介面,主要作用是將面向用戶的高階查詢語句翻譯成可被具體執行的數據處理單元操作,然後執行操作獲取數據。

DataFusion架構

架構詳情

image

DataFusion查詢引擎主要由以下幾部分構成:

  1. 前端
    • 語法解析
    • 語義分析
    • Planner:語法樹轉換成邏輯計劃

主要涉及DFParserSqlToRel這兩個struct

  1. 查詢中間表示
    • Expression(表達式)/ Type system(類型系統)
    • Query Plan / Relational Operators(關係運算元)
    • Rewrites / Optimizations(邏輯計劃優化)

主要涉及LogicalPlanExpr這兩個枚舉類

  1. 查詢底層表示
    • Statistics(物理計划算子的統計信息,輔助物理計劃優化)
    • Partitions(分塊,多線程執行物理計划算子)
    • Sort orders(物理計划算子對數據是否排序)
    • Algorithms(物理計划算子的執行演算法,如Hash join和Merge join)
    • Rewrites / Optimizations(物理計劃優化)

主要涉及PyhsicalPlanner這個trait實現的邏輯計划到物理計劃的轉換,其中主要的關鍵點是ExecutionPlanPhysicalExpr

  1. 執行運行時(運算元)
    • 分配資源
    • 向量化計算

主要涉及所有執行運算元,如GroupedHashAggregateStream

擴展點

DataFusion查詢引擎的架構還是比較簡單的,其中的擴展點也非常清晰,我們可以從以下幾個方面對DataFusion進行擴展:

用戶自定義函數UDF

無狀態方法

/// 邏輯表達式枚舉類
pub enum Expr {
    ...
    ScalarUDF {
        /// The function
        fun: Arc<ScalarUDF>,
        /// List of expressions to feed to the functions as arguments
        args: Vec<Expr>,
    },
    ...
}
/// UDF的邏輯表達式
pub struct ScalarUDF {
    /// 方法名
    pub name: String,
    /// 方法簽名
    pub signature: Signature,
    /// 返回值類型
    pub return_type: ReturnTypeFunction,
    /// 方法實現
    pub fun: ScalarFunctionImplementation,
}
/// UDF的物理表達式
pub struct ScalarFunctionExpr {
    fun: ScalarFunctionImplementation,
    name: String,
    /// 參數表達式列表
    args: Vec<Arc<dyn PhysicalExpr>>,
    return_type: DataType,
}

用戶自定義聚合函數UADF

有狀態方法

/// 邏輯表達式枚舉類
pub enum Expr {
    ...
    AggregateUDF {
        /// The function
        fun: Arc<AggregateUDF>,
        /// List of expressions to feed to the functions as arguments
        args: Vec<Expr>,
        /// Optional filter applied prior to aggregating
        filter: Option<Box<Expr>>,
    },
    ...
}
/// UADF的邏輯表達式
pub struct AggregateUDF {
    /// 方法名
    pub name: String,
    /// 方法簽名
    pub signature: Signature,
    /// 返回值類型
    pub return_type: ReturnTypeFunction,
    /// 方法實現
    pub accumulator: AccumulatorFunctionImplementation,
    /// 需要保存的狀態的類型
    pub state_type: StateTypeFunction,
}
/// UADF的物理表達式
pub struct AggregateFunctionExpr {
    fun: AggregateUDF,
    args: Vec<Arc<dyn PhysicalExpr>>,
    data_type: DataType,
    name: String,
}

用戶自定義優化規則

Optimizer定義了承載優化規則的結構體,其中optimize方法實現了邏輯計劃優化的過程。優化規則列表中的每個優化規則會被以TOP-DOWNBOTTOM-UP方式作用於邏輯計劃樹,優化規則列表會被實施多個輪次。我們可以通過實現OptimizerRule這個trait來實現自己的優化邏輯。

pub struct Optimizer {
    /// All rules to apply
    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

pub trait OptimizerRule {
    /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
    /// optimized by this rule.
    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Option<LogicalPlan>>;

    ...
}

用戶自定義邏輯計划算子

/// 邏輯計划算子枚舉類
pub enum LogicalPlan {
    ...
    Extension(Extension),
    ...
}
/// 自定義邏輯計划算子
pub struct Extension {
    /// The runtime extension operator
    pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// 自定義邏輯計划算子需要實現的trait
pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }

用戶自定義物理計划算子

/// 為自定義的邏輯計划算子`UserDefinedLogcialNode`生成對應的物理計划算子
pub trait ExtensionPlanner {
    async fn plan_extension(
        &self,
        planner: &dyn PhysicalPlanner,
        node: &dyn UserDefinedLogicalNode,
        logical_inputs: &[&LogicalPlan],
        physical_inputs: &[Arc<dyn ExecutionPlan>],
        session_state: &SessionState,
    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// DataFusion預設的邏輯計划到物理計劃的轉換器提供了自定義轉換過程的結構體
pub struct DefaultPhysicalPlanner {
    extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
/// 自定義物理計划算子需要實現的trait
pub trait ExecutionPlan: Debug + Send + Sync { ... }

用戶自定義數據源

可以看出,自定義數據源其實就是生成一個對應的ExecutionPlan執行計劃,這個執行計劃實施的是掃表的任務。如果數據源支持下推的能力,我們在這裡可以將projection filters limit等操作下推到掃表時。

/// 自定義數據源需要實現的trait
pub trait TableProvider: Sync + Send {
    ...
    async fn scan(
        &self,
        state: &SessionState,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
    ...
}

用戶自定義元數據

pub trait CatalogProvider: Sync + Send {
    ...
	
    /// 根據名稱獲取Schema
    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
    /// 註冊Schema
    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
        // use variables to avoid unused variable warnings
        let _ = name;
        let _ = schema;
        Err(DataFusionError::NotImplemented(
            "Registering new schemas is not supported".to_string(),
        ))
    }
}

pub trait SchemaProvider: Sync + Send {
    ...
    /// 根據表名獲取數據源
    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
    /// 註冊數據源
    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> Result<Option<Arc<dyn TableProvider>>> {
        Err(DataFusionError::Execution(
            "schema provider does not support registering tables".to_owned(),
        ))
    }
    ...
}

邏輯計劃(LogicalPlan)

邏輯計劃其實就是數據流圖,數據從葉子節點流向根節點

let df: DataFrame = ctx.read_table("http_api_requests_total")?
            .filter(col("path").eq(lit("/api/v2/write")))?
            .aggregate([col("status")]), [count(lit(1))])?;

這裡我們就使用DataFusion的API介面構造了一個數據流,首先read_table節點會從數據源中掃描數據到記憶體中,然後經過filter節點按照條件進行過濾,最後經過aggregate節點進行聚合。數據流過最後的節點時,就生成了我們需要的數據。

上述鏈式調用的API介面實際上並沒有真正執行對數據的操作,這裡實際上是使用了建造者模式構造了邏輯計劃樹。最終生成的DataFrame實際上只是包含了一下信息:

pub struct DataFrame {
    /// 查詢上下文信息,包含了元數據,用戶註冊的UDF和UADF,使用的優化器,使用的planner等信息
    session_state: SessionState,
    /// 邏輯計劃樹的根節點
    plan: LogicalPlan,
}

支持的邏輯計划算子

點擊查看代碼
Projection
Filter
Window
Aggregate
Sort
Join
TableScan

Repartition
Union
Subquery
Limit
Extension
Distinct

Values
Explain
Analyze
SetVariable
Prepare
Dml(...)

CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView

邏輯計劃優化

目標:確保結果相同的情況下,執行更快

image

初始的邏輯計劃,需要經過多個輪次的優化,才能生成執行效率更高的邏輯計劃。DataFusion本身的優化器內置了很多優化規則,用戶也可以擴展自己的優化規則。

內置優化輪次

  1. 下推(Pushdown):減少從一個節點到另一個節點的數據的行列數

    • PushDownProjection
    • PushDownFilter
    • PushDownLimit
  2. 簡化(Simplify):簡化表達式,減少運行時的運算。例如使用布爾代數的法則,將b > 2 AND b > 2簡化成b > 2

    • SimplifyExpressions
    • UnwrapCastInComparison
  3. 簡化(Simplify):刪除無用的節點

  4. 平鋪子查詢(Flatten Subqueries):將子查詢用join重寫

    • DecorrelateWhereExists
    • DecorrelatedWhereIn
    • ScalarSubqueryToJoin
  5. 優化join:識別join謂詞

    • ExtractEqualJoinPredicate
    • RewriteDisjunctivePredicate
    • FilterNullJoinKeys
  6. 優化distinct

    • SingleDistinctToGroupBy
    • ReplaceDistinctWithAggregate

表達式運算(Expression Evaluation)

假設現在有這樣一個謂詞表達式

path = '/api/v2/write' or path is null

經過語法解析和轉換後,可以用如下表達式樹表示:

image

DataFusion在實施表達式運算時,使用了Arrow提供的向量化計算方法來加速運算

image

物理計劃(ExecutionPlan)

image

調用DataFusion提供的DefaultPhysicalPlanner中的create_physical_plan方法,可以將邏輯計劃樹轉換成物理計劃樹。其中物理計劃樹中的每個節點都是一個ExecutionPlan。執行物理計劃樹時,會從根節點開始調用execute方法,調用該方法還沒有執行對數據的操作,僅僅是將每個物理計划算子轉換成一個RecordBatchStream運算元,形成數據流運算元樹。這些RecordBatchStream運算元都實現了future包提供的Stream特性,當我們最終調用RecordBatchStreamcollect方法時,才會從根節點開始poll一次來獲取一下輪要處理的數據,根節點的poll方法內會調用子節點的poll方法,最終每poll一次,整棵樹都會進行一次數據從葉子節點到根節點的流動,生成一個RecordBatch

image

DataFusion實現的物理計划算子具有以下特性:

  • 非同步:避免了阻塞I/O
  • 流式:數據是流式處理的
  • 向量化:每次可以向量化地處理一個RecordBatch
  • 分片:每個運算元都可以並行,可以產生多個分片
  • 多核

結語

DataFusion本身只是一個簡單,高效,可擴展的查詢引擎框架,用戶可以將DataFusion作為開發大型數據中台的基礎組件,也可以輕易地將DataFusion嵌入服務中作為查詢引擎,也可以使用DataFusion構建自己的資料庫系統。如果期望使用分散式的查詢引擎,可以關註基於ArrowDataFusion搭建的分散式查詢引擎Ballista


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • EF命令行工具 migrate.exe 進行Code First更新資料庫,6.3+使用ef6.exe 使用EF的Code First遷移可以用於從Visual Studio內部更新資料庫,但也可通過命令行工具 migrate.exe 進行執行。 如果項目已經更新到伺服器,後面的更新資料庫分為兩種辦 ...
  • ABP框架 ABP是用於創建現代化Web應用程式的完整體繫結構和強大的基礎架構,以模塊化的方式進行開發,所有模塊以nuget包的方式提供,開箱即用,遵循最佳實踐和約定,提供SOLID開發經驗。 | 縮寫 | 英文 | 中文 | |--|--|--| | SRP | The Single Respon ...
  • 以沁恆的FreeRTOS示例項目為例, 說明如何在 CH32V208 評估上運行 FreeRTOS, 以及運行 FreeRTOS 涉及的庫文件改動. ...
  • 原創文檔編寫不易,未經許可請勿轉載。文檔中有疑問的可以郵件聯繫我。 郵箱:[email protected] 文章基於CentOS 7.8系統使用Containerdr作為容器運行時通過kubeadm指導搭建k8s單機master集群,使用calico作為k8s集群的網路插件。K8S官方在1.24版本 ...
  • ClickHouse 屬於 OLAP 資料庫, 與 OLTP (Transaction Process) 相比, 註重數據分析, 重點在查詢的性能. 在業務系統中, 往往使用 OLTP 資料庫做業務數據存儲, 用 OLAP 資料庫做查詢分析, 在一些場景下ClickHouse可以取代ES(Elast... ...
  • 一.初識Redis 1.什麼是Redis ​ Redis是一個速度非常快的非關係型資料庫(non-relational database),它可以存儲鍵(key)與五種不同類型的值的映射(mapping),可以將存儲在記憶體的鍵值對數據持久化到磁碟,可以使用複製特性來擴展讀性能,也可以採用客戶端分片來... ...
  • 上一章主要作了晶元介紹,這一章主要作對開發環境的介紹。 認識Arduino Arduino是一款便捷靈活、方便上手的開源電子原型平臺。包含硬體(各種型號的Arduino板)和軟體(ArduinoIDE)。它構建於開放原始碼simple I/O介面版,並且具有使用類似Java、C語言的Processi ...
  • 本文首發於公眾號:Hunter後端 原文鏈接:Redis數據結構二之SDS和雙向鏈表 這一篇筆記介紹一下 SDS(simple dynamic string)和雙向鏈表。 以下是本篇筆記目錄: SDS 常數複雜度獲取字元串長度 杜絕緩衝區溢出 減少修改字元串帶來的記憶體重分配次數 二進位安全 相容C字 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...