聊聊流式數據湖Paimon(三)

来源:https://www.cnblogs.com/zhiyong-ITNote/archive/2023/12/25/17926519.html
-Advertisement-
Play Games

概述 如果表沒有定義主鍵,則預設情況下它是僅追加 表類型(Append Only Table)。 根據桶(Bucket)的定義,我們有兩種不同的僅追加模式:"Append For Scalable Table"和"Append For Queue";兩種模式支持不同的場景,提供不同的功能。 只能向表 ...


概述

如果表沒有定義主鍵,則預設情況下它是僅追加 表類型(Append Only Table)。 根據桶(Bucket)的定義,我們有兩種不同的僅追加模式:"Append For Scalable Table"和"Append For Queue";兩種模式支持不同的場景,提供不同的功能。
只能向表中插入一條完整的記錄。 不支持刪除或更新,並且不能定義主鍵。 此類表適合 不需要更新的用例(例如日誌數據同步)。

Append 場景特指"無主鍵"的場景,比如日誌數據的記錄,不具有直接Upsert更新的能力。

Append For Scalable Table

其支持的功能如下:

  1. 支持批讀批寫 INSERT OVERWRITE
  2. 支持流讀流寫 自動合併小文件
  3. 支持湖存儲特性 ACID、Time Travel
  4. order與z-order排序

Definition

通過在表屬性中定義 'bucket' = '-1',可以為此表分配特殊模式(我們稱之為"unaware-bucket 模式")。 在這種模式下,一切都不同了。 我們已經沒有了桶的概念,也不保證流式讀取的順序。 我們將此表視為批量離線表(儘管我們仍然可以流式讀寫)。 所有記錄都會進入一個目錄(為了相容性,我們將它們放在bucket-0中),並且我們不再維護順序。 由於我們沒有桶的概念,所以我們不會再按桶對輸入記錄進行混洗,這將加快插入速度。
使用此模式,可以將 Hive 表替換為 Lake 表。
image.png

Compaction

在unaware-bucket模式下,我們不在writer中進行壓縮,而是使用Compact Coordinator掃描小文件並將壓縮任務提交給Compact Worker。 這樣,我們就可以輕鬆地對一個簡單的數據目錄進行並行壓縮。 在流模式下,如果在flink中運行insert sql,拓撲將是這樣的:
image.png
它會儘力壓縮小文件,但是當一個分區中的單個小文件長時間保留並且沒有新文件添加到該分區時,壓縮協調器會將其從記憶體中刪除以減少記憶體使用。 重新啟動作業後,它將掃描小文件並將其再次添加到記憶體中。 控制緊湊行為的選項與 Append For Qeueue 完全相同。 如果將 write-only 設置為 true,Compact Coordinator 和 Compact Worker 將從拓撲中刪除。
自動壓縮僅在 Flink 引擎流模式下支持。還可以通過 paimon 中的 flink 操作在 flink 中啟動壓縮作業,並通過 set write-only 禁用所有其他壓縮。

Sort Compact

每個分區中的數據亂序會導致選擇緩慢,壓縮可能會減慢插入速度。 將插入作業設置為只寫是一個不錯的選擇,並且在每個分區數據完成後,觸發分區排序壓縮操作。

Streaming Source

Unaware-bucket模式 Append Only Table 支持流式讀寫,但不再保證順序。 你不能把它看作一個隊列,而是一個有bin的湖。每次提交都會生成一個新的binbin存儲記錄 來讀取增量,但是一個 bin 中的記錄會流向它們想要的任何地方,並且我們以任何可能的順序獲取它們。 在Append For Queue模式下,記錄不存儲在bin中,而是存儲在record pipe中。 記錄 存儲,我們可以通過讀取新的存儲記錄 來讀取增量,但是一個 bin 中的記錄會流向它們想要的任何地方,並且我們以任何可能的順序獲取它們。 在Append For Queue模式下,記錄不存儲在bin中,而是存儲在record pipe中。

bin:儲物箱

Streaming Multiple Partitions Write

由於Paimon-sink需要處理的寫入任務數量為:數據寫入的分區數量 * 每個分區的桶數量。 因此,我們需要儘量控制每個paimon-sink任務的寫任務數量,使其分佈在合理的範圍內。 如果每個sink-task處理過多的寫任務,不僅會導致小文件過多的問題,還可能導致記憶體不足的錯誤。
另外,寫入失敗會引入孤兒文件,這無疑增加了維護paimon的成本。 我們需要儘可能避免這個問題。
對於啟用自動合併的 flink-jobs,我們建議嘗試按照以下公式來調整 paimon-sink 的並行度(這不僅僅適用於append-only-tables,它實際上適用於大多數場景):

(N*B)/P < 100   (This value needs to be adjusted according to the actual situation)
N(the number of partitions to which the data is written)
B(bucket number)
P(parallelism of paimon-sink)
100 (This is an empirically derived threshold,For flink-jobs with auto-merge disabled, this value can be reduced.
However, please note that you are only transferring part of the work to the user-compaction-job, you still have to deal with the problem in essence,
the amount of work you have to deal with has not been reduced, and the user-compaction-job still needs to be adjusted according to the above formula.)

還可以將 write-buffer-spillable 設置為 true,writer 可以將記錄溢出到磁碟。 這可以儘可能地減少小文件。要使用此選項,的 flink 集群需要有一定大小的本地磁碟。 這對於那些在 k8s 上使用 flink 的人來說尤其重要。
對於僅追加表,您可以為僅追加表設置 write-buffer-for-append 選項。 將此參數設置為true,writer將使用Segment Pool緩存記錄以避免OOM。

Example

以下是創建Append-Only表並指定bucket key的示例。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT
) WITH (
  'bucket' = '-1'
);

Append For Queue

其支持的功能如下:

  1. 嚴格保證順序,可以帶消息隊列
  2. 支持Watermark且對齊
  3. 自動合併小文件
  4. 支持Consumer-ID (類似Group-ID)

Definition

在這種模式下,可以將append-only table看成是一個由bucket分隔的隊列。 同一個桶中的每條記錄都是嚴格排序的,流式讀取會嚴格按照寫入的順序將記錄傳輸到下游。 使用此模式,不需要進行特殊配置,所有數據都會以隊列的形式放入一個桶中。還可以定義bucketbucket-key以實現更大的並行性和分散數據。
image.png

Compaction

預設情況下,sink節點會自動進行compaction來控制文件數量。 以下選項控制壓縮策略:
image.png

Streaming Source

目前僅 Flink 引擎支持流式源行為。

Streaming Read Order

對於流式讀取,記錄按以下順序生成:

  • 對於來自兩個不同分區的任意兩條記錄
    • 如果 scan.plan-sort-partition 設置為 true,則首先生成分區值較小的記錄。
    • 否則,將先產生分區創建時間較早的記錄。
  • 對於來自同一分區、同一桶的任意兩條記錄,將首先產生第一條寫入的記錄。
  • 對於來自同一分區但兩個不同桶的任意兩條記錄,不同的桶由不同的任務處理,它們之間沒有順序保證。
Watermark Definition

定義讀取 Paimon 表的watermark:

CREATE TABLE T (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
 TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

還可以啟用 Flink Watermark 對齊,這將確保沒有源/拆分/分片/分區將其 Watermark 增加得遠遠超出其他部分:
image.png

Bounded Stream

Streaming Source 也可以是有界的,指定 scan.bounded.watermark 來定義有界流模式的結束條件,流讀取將結束,直到遇到更大的 watermark 快照。
快照中的watermark 是由writer生成的,例如,指定kafka源並聲明watermark 的定義。當使用此kafka源寫入Paimon表時,Paimon表的快照將生成相應的watermark,以便流式讀取此Paimon表時可以使用有界watermark的功能。

CREATE TABLE kafka_table (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;

-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

Example

以下是創建Append-Only表並指定bucket key的示例。

CREATE TABLE MyTable (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'bucket' = '8',
    'bucket-key' = 'product_id'
);

參考

基於 Apache Paimon 的 Append 表處理
Apache Paimon 實時數據湖 Streaming Lakehouse 的存儲底座


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這道題目可以考慮,如果首碼是一樣的長度,那麼只需要兩個鏈表同時向後檢索,直到找到一樣的元素為止。所以應該先找到兩個鏈表的長度,然後將較長的一個鏈表的多出來的首碼部分刪掉,也就不去看這一部分。因為尾碼都是一樣的,所以長度的差異只可能來自首碼。 解決代碼: typedef struct Node{ ch ...
  • 編譯Keystone 根據github的文檔編譯不出來,所以還是用CMake項目轉成Visual Studio的項目來編譯 1、下載源碼 https://github.com/keystone-engine/keystone clone或者直接下載zip都行 2、CMake創建Visual Stud ...
  • 1.關鍵字(keyword) 定義:被Java語言賦予了特殊含義,用做專門用途的字元串(或單詞),這些字元串(或單詞)已經被Java定義好了。 特點:全部關鍵字都是小寫字母。 關鍵字查閱的官方地址: https://docs.oracle.com/javase/tutorial/java/nutsa ...
  • 概述:Dispatcher是WPF中用於協調UI線程和非UI線程操作的關鍵類,通過消息迴圈機制確保UI元素的安全更新。常見用途包括非同步任務中的UI更新和定時器操作。在實踐中,需註意避免UI線程阻塞、死鎖,並使用CheckAccess方法確保在正確的線程上執行操作。這有助於提升應用程式的性能和用戶體驗 ...
  • 上次購買XL2400是在10月份, 那時候還是XL2400, 但是最近這個型號已經被XL2400P代替了, 再買收到的就是XL2400P. 這兩個型號的差異不小, 在遷移到 XL2400P 的過程中遇到了一些坑, 因此把這些坑記錄一下, 避免後面使用的人浪費時間. ...
  • 本文記錄瞭如何零基礎通過 `BCC` 框架,入門 `eBPF` 程式的開發,並實現幾個簡易的程式。 拋磚引玉,如有論述錯誤之處,還請斧正。 ...
  • 一、雙主雙從架構介紹 在MySQL多主多從的架構配置中和雙主雙從是一樣的,學會了雙主雙從的架構部署,多主多從的配置也同樣就回了。下麵以雙主雙從作為示例演示。其中一個主機maste1用於處理所有寫請求,它的從機slave1和另外一臺主機master2還有它的從機salve2負責所有讀數據請求,當mas ...
  • 顧名思義, Extra 列是用來說明一些額外信息的, 我們可以通過這些額外信息來更準確的理解 MySQL 到底將如何執行給定的查詢語句。 MySQL 提供的額外信息很多。這裡單說 Using where。 Using where 只是表示 MySQL 使用 where 子句中的條件對記錄進行了過濾。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...