Apache Hudi 在袋鼠雲數據湖平臺的設計與實踐

来源:https://www.cnblogs.com/DTinsight/archive/2023/05/24/17427402.html
-Advertisement-
Play Games

在大數據處理中,[實時數據分析](https://www.dtstack.com/dtengine/easylake?src=szsm)是一個重要的需求。隨著數據量的不斷增長,對於實時分析的挑戰也在不斷加大,傳統的批處理方式已經不能滿足[實時數據處理](https://www.dtstack.com ...


在大數據處理中,實時數據分析是一個重要的需求。隨著數據量的不斷增長,對於實時分析的挑戰也在不斷加大,傳統的批處理方式已經不能滿足實時數據處理的需求,需要一種更加高效的技術來解決這個問題。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是這樣一種技術,提供了高效的實時數據倉庫管理功能。

本文將介紹袋鼠雲基於 Hudi 構建數據湖的整體方案架構及其在實時數據倉庫處理方面的特點,並且為大家展示一個使用 Apache Hudi 的簡單示例,便於新手上路。

Apache Hudi 介紹

Apache Hudi 是一個開源的數據湖存儲系統,可以在 Hadoop 生態系統中提供實時數據倉庫處理功能。Hudi 最早由 Uber 開發,後來成為 Apache 頂級項目。

Hudi 主要特性

· 支持快速插入和更新操作,以便在數據倉庫中實時處理數據;

· 提供增量查詢功能,可有效提高數據分析效率;

· 支持時間點查詢,以便查看數據在某一時刻的狀態;

· 與 Apache Spark、Hive 等大數據分析工具相容。

Hudi 架構

Apache Hudi 的架構包括以下幾個主要組件:

· Hudi 數據存儲:Hudi 數據存儲是 Hudi 的核心組件,負責存儲數據,數據存儲有兩種類型:Copy-On-Write(COW)和 Merge-On-Read(MOR);

· Copy-On-Write:COW 存儲類型會在對數據進行更新時,創建一個新的數據文件副本,將更新的數據寫入副本中,之後,新的數據文件副本會替換原始數據文件;

· Merge-On-Read:MOR 存儲類型會在查詢時,將更新的數據與原始數據進行合併,這種方式可以減少數據存儲的寫入延遲,但會增加查詢的計算量;

· Hudi 索引:Hudi 索引用於維護數據記錄的位置信息,索引有兩種類型:內置索引(如 Bloom 過濾器)和外部索引(如 HBase 索引);

· Hudi 查詢引擎:Hudi 查詢引擎負責處理查詢請求,Hudi 支持多種查詢引擎,如 Spark SQL、Hive、Presto 等。

file

Hudi 的使用場景

Apache Hudi 可以幫助企業和組織實現實時數據處理和分析。實時數據處理需要快速地處理和查詢數據,同時還需要保證數據的一致性和可靠性。

Apache Hudi 的增量數據處理ACID 事務性保證、寫時合併等技術特性可以幫助企業更好地實現實時數據處理和分析,基於 Hudi 的特性可以在一定程度上在實時數倉的構建過程中承擔上下游數據鏈路的對接(類似 Kafka 的角色)。既能實現增量的數據處理,也能為批流一體的處理提供存儲基礎。

Hudi 的優勢和劣勢

● 優勢

· 高效處理大規模數據集;

· 支持實時數據更新和查詢;

· 實現了增量寫入機制,提高了數據訪問效率;

· Hudi 可以與流處理管道集成;

· Hudi 提供了時間旅行功能,允許回溯數據的歷史版本。

● 劣勢

· 在讀寫數據時需要付出額外的代價;

· 操作比較複雜,需要使用專業的編程語言和工具。

Hudi 在袋鼠雲數據湖平臺上的實踐

Hudi 在袋鼠雲數據湖的技術架構

Hudi 在袋鼠雲的數據湖平臺上主要對數據湖管理提供助力:

· 元數據的接入,讓用戶可以快速的對錶進行管理;

· 數據快速接入,包括對符合條件的原有表數據進行轉換,快速搭建數據湖能力;

· 湖表的管理,監控小文件定期進行合併,提升表的查詢性能,內在豐富的表操作功能,包括 time travel ,孤兒文件清理,過期快照清理等;

· 索引構建,提供多種索引包括 bloom filter,zorder 等,提升計算引擎的查詢性能。

file

Hudi 使用示例

在介紹了 Hudi 的基本信息和袋鼠雲數據湖平臺的結構之後,我們來看一個使用示例,替換 Flink 在記憶體中的 join 過程。

在 Flink 中對多流 join 往往是比較頭疼的場景,需要考慮 state ttl 時間設置,設置太小數據經常關聯不上,設置太大記憶體又需要很高才能保留,我們通過 Hudi 的方式來換個思路實現。

● 構建 catalog

public String createCatalog(){
    String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +
            "    'type' = 'hudi',\n" +
            "    'mode' = 'hms',\n" +
            "    'default-database' = 'default',\n" +
            "    'hive.conf.dir' = '/hive_conf_dir',\n" +
            "    'table.external' = 'true'\n" +
            ")";

    return createCatalog;
}

● 創建 hudi 表

public String createHudiTable(){

    String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" +
            "  id int ,\n" +
            "  name VARCHAR(10),\n" +
            "  age int ,\n" +
            "  address VARCHAR(10),\n" +
            "  dt VARCHAR(10),\n" +
            "  primary key(id) not enforced\n" +
            ")\n" +
            "PARTITIONED BY (dt)\n" +
            "WITH (\n" +
            "  'connector' = 'hudi',\n" +
            "  'table.type' = 'MERGE_ON_READ',\n" +
            "  'changelog.enabled' = 'true',\n" +
            "  'index.type' = 'BUCKET',\n" +
            "  'hoodie.bucket.index.num.buckets' = '2',\n" +
            String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +
            "  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'\n" +

    ");";

    return createTable;
}

● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列

01 從 kafka 中讀取 topic1

public String createKafkaTable1(){
    String kafkaSource1 = "CREATE TABLE source1\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    age        INT,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic1'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource1;
}

02 從 kafka 中讀取 topic2

public String createKafkaTable2(){
    String kafkaSource2 = "CREATE TABLE source2\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    address        string,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic2'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource2;
}

● 執行插入邏輯1

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " +
        "select id, name,age,dt from source1";

● 通過 spark 查詢數據

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1

● 執行插入邏輯2

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " +
        "select id, name, address,dt from source2";

● 運行成功

運行成功後在 spark 中查詢對應的表數據:

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1

可以發現在第二次數據運行之後,表數據的對應欄位 address 已經更新,達到了類似在 Flink 中直接執行 join 的效果。

`insert into hudi_catalog.flink_db.test_hudi_flink_join_2

select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id `

《數棧產品白皮書》:https://www.dtstack.com/resources/1004?src=szsm

《數據治理行業實踐白皮書》下載地址:https://www.dtstack.com/resources/1001?src=szsm

想瞭解或咨詢更多有關袋鼠雲大數據產品、行業解決方案、客戶案例的朋友,瀏覽袋鼠雲官網:https://www.dtstack.com/?src=szbky

同時,歡迎對大數據開源項目有興趣的同學加入「袋鼠雲開源框架釘釘技術qun」,交流最新開源技術信息,qun號碼:30537511,項目地址:https://github.com/DTStack


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 官網下載 Java 您可以從官方網站下載 Java 的最新穩定版本。 官網地址:https://www.oracle.com/technetwork/java/javase/overview/index.html 安裝 Java 有些電腦可能已經安裝了Java。 要檢查Windows PC上是否安裝 ...
  • 小白一枚,今天來給大家分享一下如何將自己編寫的桌面應用程式進行打包,便於在其他的電腦上進行安裝使用。如有錯誤之處請大家指正,謝謝!好了,廢話不多說開乾。 這裡簡要的介紹一下使用QtCreator編寫的應用程式的打包。 1. 將Debug模式切換稱為ewlease模式進行編譯,編譯後沒有任何錯誤即可( ...
  • 開發板:NanoPC-T4開發板eMMC:16GBLPDDR3:4GB顯示屏:15.6 HDMI介面顯示屏u-boot :2023.04 在前面我們已經介紹了編譯Rockchip官方提供的uboot源碼,並下載到開發板中進行測試運行。這一節我們嘗試下載最新的uboot版本試試,當前最新版本為2023 ...
  • > 本文首發於公眾號:Hunter後端 > 原文鏈接:[es筆記六之聚合操作之指標聚合](https://mp.weixin.qq.com/s/UyiZ2bzFxi7zCGmL1Xf3CQ) 聚合操作,在 es 中的聚合可以分為大概四種聚合: * bucketing(桶聚合) * mertic(指標 ...
  • 原文地址:[https://blog.fanscore.cn/a/53/](https://blog.fanscore.cn/a/53/) # 1. 前言 本文是[與世界分享我剛編的轉發ntunnel_mysql.php的工具](https://blog.fanscore.cn/a/47/)的後續, ...
  • 這個資料庫主要包含兩個表,考慮到原破解APP數據就是這樣也就不折分了,一個是有5186條記錄的腦筯急轉彎表,一個是有18326條記錄的謎語表,兩個表中的記錄都有詳細的分類欄位,具體看截圖下的分類統計。 腦筋急轉彎分類統計:燈謎(79)、動物(81)、兒童(190)、搞笑(77)、經典(110)、冷笑 ...
  • 今天這個數據是一款字典的類型的軟體,專門用來查詢一些學術上面的名詞的中英對照,超過180個學科分類,150多萬條記錄,伴隨您悠游於學海之中,是您做學問、寫論文的好幫手。 主要科目有:電子計算機名詞(107213)、電機工程名詞(100395)、電力工程(68379)、外國地名譯名(64487)、機械 ...
  • 本文是How to find the TLS used for the SQL Server connection這篇英語文章的翻譯,此文出處請見於文章底部鏈接:原文出處[1] 對於客戶,我做了一些研究,如何找出SQL Server資料庫會話連接使用了哪一種TLS協議。唯一的方式就是創建一個擴展事件 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...