Apache Hudi 在袋鼠雲數據湖平臺的設計與實踐

来源:https://www.cnblogs.com/DTinsight/archive/2023/05/24/17427402.html
-Advertisement-
Play Games

在大數據處理中,[實時數據分析](https://www.dtstack.com/dtengine/easylake?src=szsm)是一個重要的需求。隨著數據量的不斷增長,對於實時分析的挑戰也在不斷加大,傳統的批處理方式已經不能滿足[實時數據處理](https://www.dtstack.com ...


在大數據處理中,實時數據分析是一個重要的需求。隨著數據量的不斷增長,對於實時分析的挑戰也在不斷加大,傳統的批處理方式已經不能滿足實時數據處理的需求,需要一種更加高效的技術來解決這個問題。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是這樣一種技術,提供了高效的實時數據倉庫管理功能。

本文將介紹袋鼠雲基於 Hudi 構建數據湖的整體方案架構及其在實時數據倉庫處理方面的特點,並且為大家展示一個使用 Apache Hudi 的簡單示例,便於新手上路。

Apache Hudi 介紹

Apache Hudi 是一個開源的數據湖存儲系統,可以在 Hadoop 生態系統中提供實時數據倉庫處理功能。Hudi 最早由 Uber 開發,後來成為 Apache 頂級項目。

Hudi 主要特性

· 支持快速插入和更新操作,以便在數據倉庫中實時處理數據;

· 提供增量查詢功能,可有效提高數據分析效率;

· 支持時間點查詢,以便查看數據在某一時刻的狀態;

· 與 Apache Spark、Hive 等大數據分析工具相容。

Hudi 架構

Apache Hudi 的架構包括以下幾個主要組件:

· Hudi 數據存儲:Hudi 數據存儲是 Hudi 的核心組件,負責存儲數據,數據存儲有兩種類型:Copy-On-Write(COW)和 Merge-On-Read(MOR);

· Copy-On-Write:COW 存儲類型會在對數據進行更新時,創建一個新的數據文件副本,將更新的數據寫入副本中,之後,新的數據文件副本會替換原始數據文件;

· Merge-On-Read:MOR 存儲類型會在查詢時,將更新的數據與原始數據進行合併,這種方式可以減少數據存儲的寫入延遲,但會增加查詢的計算量;

· Hudi 索引:Hudi 索引用於維護數據記錄的位置信息,索引有兩種類型:內置索引(如 Bloom 過濾器)和外部索引(如 HBase 索引);

· Hudi 查詢引擎:Hudi 查詢引擎負責處理查詢請求,Hudi 支持多種查詢引擎,如 Spark SQL、Hive、Presto 等。

file

Hudi 的使用場景

Apache Hudi 可以幫助企業和組織實現實時數據處理和分析。實時數據處理需要快速地處理和查詢數據,同時還需要保證數據的一致性和可靠性。

Apache Hudi 的增量數據處理ACID 事務性保證、寫時合併等技術特性可以幫助企業更好地實現實時數據處理和分析,基於 Hudi 的特性可以在一定程度上在實時數倉的構建過程中承擔上下游數據鏈路的對接(類似 Kafka 的角色)。既能實現增量的數據處理,也能為批流一體的處理提供存儲基礎。

Hudi 的優勢和劣勢

● 優勢

· 高效處理大規模數據集;

· 支持實時數據更新和查詢;

· 實現了增量寫入機制,提高了數據訪問效率;

· Hudi 可以與流處理管道集成;

· Hudi 提供了時間旅行功能,允許回溯數據的歷史版本。

● 劣勢

· 在讀寫數據時需要付出額外的代價;

· 操作比較複雜,需要使用專業的編程語言和工具。

Hudi 在袋鼠雲數據湖平臺上的實踐

Hudi 在袋鼠雲數據湖的技術架構

Hudi 在袋鼠雲的數據湖平臺上主要對數據湖管理提供助力:

· 元數據的接入,讓用戶可以快速的對錶進行管理;

· 數據快速接入,包括對符合條件的原有表數據進行轉換,快速搭建數據湖能力;

· 湖表的管理,監控小文件定期進行合併,提升表的查詢性能,內在豐富的表操作功能,包括 time travel ,孤兒文件清理,過期快照清理等;

· 索引構建,提供多種索引包括 bloom filter,zorder 等,提升計算引擎的查詢性能。

file

Hudi 使用示例

在介紹了 Hudi 的基本信息和袋鼠雲數據湖平臺的結構之後,我們來看一個使用示例,替換 Flink 在記憶體中的 join 過程。

在 Flink 中對多流 join 往往是比較頭疼的場景,需要考慮 state ttl 時間設置,設置太小數據經常關聯不上,設置太大記憶體又需要很高才能保留,我們通過 Hudi 的方式來換個思路實現。

● 構建 catalog

public String createCatalog(){
    String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +
            "    'type' = 'hudi',\n" +
            "    'mode' = 'hms',\n" +
            "    'default-database' = 'default',\n" +
            "    'hive.conf.dir' = '/hive_conf_dir',\n" +
            "    'table.external' = 'true'\n" +
            ")";

    return createCatalog;
}

● 創建 hudi 表

public String createHudiTable(){

    String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" +
            "  id int ,\n" +
            "  name VARCHAR(10),\n" +
            "  age int ,\n" +
            "  address VARCHAR(10),\n" +
            "  dt VARCHAR(10),\n" +
            "  primary key(id) not enforced\n" +
            ")\n" +
            "PARTITIONED BY (dt)\n" +
            "WITH (\n" +
            "  'connector' = 'hudi',\n" +
            "  'table.type' = 'MERGE_ON_READ',\n" +
            "  'changelog.enabled' = 'true',\n" +
            "  'index.type' = 'BUCKET',\n" +
            "  'hoodie.bucket.index.num.buckets' = '2',\n" +
            String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +
            "  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'\n" +

    ");";

    return createTable;
}

● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列

01 從 kafka 中讀取 topic1

public String createKafkaTable1(){
    String kafkaSource1 = "CREATE TABLE source1\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    age        INT,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic1'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource1;
}

02 從 kafka 中讀取 topic2

public String createKafkaTable2(){
    String kafkaSource2 = "CREATE TABLE source2\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    address        string,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic2'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource2;
}

● 執行插入邏輯1

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " +
        "select id, name,age,dt from source1";

● 通過 spark 查詢數據

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1

● 執行插入邏輯2

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " +
        "select id, name, address,dt from source2";

● 運行成功

運行成功後在 spark 中查詢對應的表數據:

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1

可以發現在第二次數據運行之後,表數據的對應欄位 address 已經更新,達到了類似在 Flink 中直接執行 join 的效果。

`insert into hudi_catalog.flink_db.test_hudi_flink_join_2

select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id `

《數棧產品白皮書》:https://www.dtstack.com/resources/1004?src=szsm

《數據治理行業實踐白皮書》下載地址:https://www.dtstack.com/resources/1001?src=szsm

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 官網下載 Java 您可以從官方網站下載 Java 的最新穩定版本。 官網地址:https://www.oracle.com/technetwork/java/javase/overview/index.html 安裝 Java 有些電腦可能已經安裝了Java。 要檢查Windows PC上是否安裝 ...
  • 小白一枚,今天來給大家分享一下如何將自己編寫的桌面應用程式進行打包,便於在其他的電腦上進行安裝使用。如有錯誤之處請大家指正,謝謝!好了,廢話不多說開乾。 這裡簡要的介紹一下使用QtCreator編寫的應用程式的打包。 1. 將Debug模式切換稱為ewlease模式進行編譯,編譯後沒有任何錯誤即可( ...
  • 開發板:NanoPC-T4開發板eMMC:16GBLPDDR3:4GB顯示屏:15.6 HDMI介面顯示屏u-boot :2023.04 在前面我們已經介紹了編譯Rockchip官方提供的uboot源碼,並下載到開發板中進行測試運行。這一節我們嘗試下載最新的uboot版本試試,當前最新版本為2023 ...
  • > 本文首發於公眾號:Hunter後端 > 原文鏈接:[es筆記六之聚合操作之指標聚合](https://mp.weixin.qq.com/s/UyiZ2bzFxi7zCGmL1Xf3CQ) 聚合操作,在 es 中的聚合可以分為大概四種聚合: * bucketing(桶聚合) * mertic(指標 ...
  • 原文地址:[https://blog.fanscore.cn/a/53/](https://blog.fanscore.cn/a/53/) # 1. 前言 本文是[與世界分享我剛編的轉發ntunnel_mysql.php的工具](https://blog.fanscore.cn/a/47/)的後續, ...
  • 這個資料庫主要包含兩個表,考慮到原破解APP數據就是這樣也就不折分了,一個是有5186條記錄的腦筯急轉彎表,一個是有18326條記錄的謎語表,兩個表中的記錄都有詳細的分類欄位,具體看截圖下的分類統計。 腦筋急轉彎分類統計:燈謎(79)、動物(81)、兒童(190)、搞笑(77)、經典(110)、冷笑 ...
  • 今天這個數據是一款字典的類型的軟體,專門用來查詢一些學術上面的名詞的中英對照,超過180個學科分類,150多萬條記錄,伴隨您悠游於學海之中,是您做學問、寫論文的好幫手。 主要科目有:電子計算機名詞(107213)、電機工程名詞(100395)、電力工程(68379)、外國地名譯名(64487)、機械 ...
  • 本文是How to find the TLS used for the SQL Server connection這篇英語文章的翻譯,此文出處請見於文章底部鏈接:原文出處[1] 對於客戶,我做了一些研究,如何找出SQL Server資料庫會話連接使用了哪一種TLS協議。唯一的方式就是創建一個擴展事件 ...
一周排行
    -Advertisement-
    Play Games
  • 人臉識別技術在現代社會中扮演著越來越重要的角色,比如人臉識別門禁、人臉識別支付、甚至人臉識別網站登錄等。 最近有群友問.NET有沒有人臉識別的組件,小編查閱相關資料介紹下麵幾種.NET人臉識別組件供大家參考。 **1、Microsoft Azure Face API** 簡介:Microsoft A ...
  • # 1. 與 .NET Core 緩存的關係和差異 ABP 框架中的緩存系統核心包是 [Volo.Abp.Caching](https://www.nuget.org/packages/Volo.Abp.Caching) ,而對於分散式緩存的支持,abp 官方提供了基於 Redis 的方案,需要安裝 ...
  • 最近ET做熱更重載dll的時候,返回登陸會重新檢測新的dll,首次登錄之前已經Assembly.Load()過一次dll,第二次返回登陸再次load dll到記憶體中,Invoke執行方法的時候,異常了,有些方法執行了,有些未執行,於是查資料,看到些老資料說Assembly.Load重覆載入同名dll ...
  • 1. 擴展方法 擴展方法使你能夠向現有類型“添加”方法,而無需創建新的派生類型、重新編譯或以其他方式修改原始類型。 擴展方法是一種靜態方法,但可以像擴展類型上的實例方法一樣進行調用。 對於用 C#、F# 和 Visual Basic 編寫的客戶端代碼,調用擴展方法與調用在類型中定義的方法沒有明顯區別 ...
  • 以前在隨筆《Winform開發框架之客戶關係管理系統(CRM)的開發總結系列1-界面功能展示 》的幾篇隨筆中介紹過基於WInform開發框架開發的CRM系統,系統的功能主要也是圍繞著客戶相關信息來進行管理的。本篇隨筆介紹在最新的《SqlSugar開發框架》中整合CRM系統模塊的功能。 ...
  • 隨著技術的發展,ASP.NET Core MVC也推出了好長時間,經過不斷的版本更新迭代,已經越來越完善,本系列文章主要講解ASP.NET Core MVC開發B/S系統過程中所涉及到的相關內容,適用於初學者,在校畢業生,或其他想從事ASP.NET Core MVC 系統開發的人員。 經過前幾篇文章... ...
  • [toc] 這篇文章是我之前總結的一篇文章,因為整理博客的原因,原有博客已經註銷,但這篇文章對一些讀者很有用,所以現在新瓶裝舊酒重新整理回來分享給大家。 最近一段時間生產環境頻繁出問題,每次都會生成一個hs_err_pid*.log文件,因為工作內容的原因,在此之前並沒有瞭解過相關內容,趁此機會學習 ...
  • # 前言 在上一篇文章中,給大家講解了泛型的概念、作用、使用場景,以及泛型集合、泛型介面和泛型類的用法,但受限於篇幅,並沒有把泛型的內容講解完畢。所以今天我們會繼續學習泛型方法、泛型擦除,以及通配符等的內容,希望大家繼續做好學習的準備哦。 *** 全文大約【**4600】** 字,不說廢話,只講可以 ...
  • 昨天遇到參數key大小寫不一致導致校驗簽名失敗的問題,查了很長時間才找到原因。看了一下FastJson源碼,發現JSON.toObject中轉換成對象的時候會忽略大小寫。 所以,當使用了JSON.toObject將json轉成Java對象後,再用JSON.toObject轉成json,key值就變了 ...
  • 基於java的線上商城設計與實現,線上購物平臺,校園購物商城,商品銷售平臺,基於Java的電商平臺;電商平臺,買家和賣家可以在此平臺上進行銷售和交易,節約了大量的線下時間成本,購物車的功能,校園交易平臺等等; ...