DataLeap的Catalog系統近實時消息同步能力優化

来源:https://www.cnblogs.com/bytedata/archive/2022/09/20/16710630.html
-Advertisement-
Play Games

更多技術交流、求職機會,歡迎關註位元組跳動數據平臺微信公眾號,回覆【1】進入官方交流群 摘要 位元組數據中台DataLeap的Data Catalog系統通過接收MQ中的近實時消息來同步部分元數據。Apache Atlas對於實時消息的消費處理不滿足性能要求,內部使用Flink任務的處理方案在ToB場景 ...


更多技術交流、求職機會,歡迎關註位元組跳動數據平臺微信公眾號,回覆【1】進入官方交流群

 

摘要

位元組數據中台DataLeap的Data Catalog系統通過接收MQ中的近實時消息來同步部分元數據。Apache Atlas對於實時消息的消費處理不滿足性能要求,內部使用Flink任務的處理方案在ToB場景中也存在諸多限制,所以團隊自研了輕量級非同步消息處理框架,很好的支持了位元組內部和火山引擎上同步元數據的訴求。本文定義了需求場景,並詳細介紹框架的設計與實現。

背景

動機

位元組數據中台DataLeap的Data Catalog系統基於Apache Atlas搭建,其中Atlas通過Kafka獲取外部系統的元數據變更消息。在開源版本中,每台伺服器支持的Kafka Consumer數量有限,在每日百萬級消息體量下,經常有長延時等問題,影響用戶體驗。

在2020年底,我們針對Atlas的消息消費部分做了重構,將消息的消費和處理從後端服務中剝離出來,並編寫了Flink任務承擔這部分工作,比較好的解決了擴展性和性能問題。然而,到2021年年中,團隊開始重點投入私有化部署和火山公有雲支持,對於Flink集群的依賴引入了可維護性的痛點。

在仔細的分析了使用場景和需求,並調研了現成的解決方案後,我們決定投入人力自研一個消息處理框架。當前這個框架很好的支持了位元組內部以及ToB場景中Data Catalog對於消息消費和處理的場景。

本文會詳細介紹框架解決的問題,整體的設計,以及實現中的關鍵決定。

需求定義

使用下麵的表格將具體場景定義清楚。

相關工作

在啟動自研之前,我們評估了兩個比較相關的方案,分別是Flink和Kafka Streaming。

Flink是我們之前生產上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性。在公有雲場景,那個階段Flink服務在火山雲上還沒有發佈,我們自己的服務又有嚴格的時間線,所以必須考慮替代;在私有化場景,我們不確認客戶的環境一定有Flink集群,即使部署的數據底座中帶有Flink,後續的維護也是個頭疼的問題。另外一個角度,作為通用流式處理框架,Flink的大部分功能其實我們並沒有用到,對於單條消息的流轉路徑,其實只是簡單的讀取和處理,使用Flink有些“殺雞用牛刀”了。

另外一個比較標準的方案是Kafka Streaming。作為Kafka官方提供的框架,對於流式處理的語義有較好的支持,也滿足我們對於輕量的訴求。最終沒有採用的主要考慮點是兩個:

  • 對於Offset的維護不夠靈活:我們的場景不能使用自動提交(會丟消息),而對於同一個Partition中的數據又要求一定程度的並行處理,使用Kafka Streaming的原生介面較難支持。

  • 與Kafka強綁定:大部分場景下,我們團隊不是元數據消息隊列的擁有者,也有團隊使用RocketMQ等提供元數據變更,在應用層,我們希望使用同一套框架相容。

設計

概念說明

  • MQ Type:Message Queue的類型,比如Kafka與RocketMQ。後續內容以Kafka為主,設計一定程度相容其他MQ。

  • Topic:一批消息的集合,包含多個Partition,可以被多個Consumer Group消費。

  • Consumer Group:一組Consumer,同一Group內的Consumer數據不會重覆消費。

  • Consumer:消費消息的最小單位,屬於某個Consumer Group。

  • Partition:Topic中的一部分數據,同一Partition內消息有序。同一Consumer Group內,一個Partition只會被其中一個Consumer消費。

  • Event:由Topic中的消息轉換而來,部分屬性如下。

    Event Type:消息的類型定義,會與Processor有對應關係;

    Event Key:包含消息Topic、Partition、Offset等元數據,用來對消息進行Hash操作;

  • Processor:消息處理的單元,針對某個Event Type定製的業務邏輯。

  • Task:消費消息並處理的一條Pipeline,Task之間資源是相互獨立的。

框架架構

整個框架主要由MQ Consumer, Message Processor和State Manager組成。

  • MQ Consumer:負責從Kafka Topic拉取消息,並根據Event Key將消息投放到內部隊列,如果消息需要延時消費,會被投放到對應的延時隊列;該模塊還負責定時查詢State Manager中記錄的消息狀態,並根據返回提交消息Offset;上報與消息消費相關的Metric。

  • Message Processor:負責從隊列中拉取消息並非同步進行處理,它會將消息的處理結果更新給State Manager,同時上報與消息處理相關的Metric。

  • State Manager:負責維護每個Kafka Partition的消息狀態,並暴露當前應提交的Offset信息給MQ Consumer。

實現

線程模型

 

 

每個Task可以運行在一臺或多台實例,建議部署到多台機器,以獲得更好的性能和容錯能力。

每台實例中,存在兩組線程池:

  • Consumer Pool:負責管理MQ Consumer Thread的生命周期,當服務啟動時,根據配置拉起一定規模的線程,併在服務關閉時確保每個Thread安全退出或者超時停止。整體有效Thread的上限與Topic的Partition的總數有關。

  • Processor Pool:負責管理Message Processor Thread的生命周期,當服務啟動時,根據配置拉起一定規模的線程,併在服務關閉時確保每個Thread安全退出或者超時停止。可以根據Event Type所需要處理的並行度來靈活配置。

兩類Thread的性質分別如下:

  • Consumer Thread:每個MQ Consumer會封裝一個Kafka Consumer,可以消費0個或者多個Partition。根據Kafka的機制,當MQ Consumer Thread的個數超過Partition的個數時,當前Thread不會有實際流量。

  • Processor Thread:唯一對應一個內部的隊列,並以FIFO的方式消費和處理其中的消息。

StateManager

 

 

在State Manager中,會為每個Partition維護一個優先隊列(最小堆),隊列中的信息是Offset,兩個優先隊列的職責如下:

  • 處理中的隊列:一條消息轉化為Event後,MQ Consumer會調用StateManager介面,將消息Offset 插入該隊列。

  • 處理完的隊列:一條消息處理結束或最終失敗,Message Processor會調用StateManager介面,將消息Offset插入該隊列。

  1. MQ Consumer會周期性的檢查當前可以Commit的Offset,情況枚舉如下:

  • 處理中的隊列堆頂 < 處理完的隊列堆頂或者處理完的隊列為空:代表當前消費回來的消息還在處理過程中,本輪不做Offset提交。

  • 處理中的隊列堆頂 = 處理完的隊列堆頂:表示當前消息已經處理完,兩邊同時出隊,並記錄當前堆頂為可提交的Offset,重覆檢查過程。

  • 處理中的隊列堆頂 > 處理完的隊列堆頂:異常情況,通常是數據回放到某些中間狀態,將處理完的隊列堆頂出堆。

註意:當發生Consumer的Rebalance時,需要將對應Partition的隊列清空

KeyBy與Delay Processing的支持

因源頭的Topic和消息格式有可能不可控制,所以MQ Consumer的職責之一是將消息統一封裝為Event。

根據需求,會從原始消息中拼裝出Event Key,對Key取Hash後,相同結果的Event會進入同一個隊列,可以保證分區內的此類事件處理順序的穩定,同時將消息的消費與處理解耦,支持增大內部隊列數量來增加吞吐。

Event中也支持設置是否延遲處理屬性,可以根據Event Time延遲固定時間後處理,需要被延遲處理的事件會被髮送到有界延遲隊列中,有界延遲隊列的實現繼承了DelayQueue,限制DelayQueue長度, 達到限定值入隊會被阻塞。

異常處理

Processor在消息處理過程中,可能遇到各種異常情況,設計框架的動機之一就是為業務邏輯的編寫者屏蔽掉這種複雜度。Processor相關框架的邏輯會與State Manager協作,處理異常並充分暴露狀態。比較典型的異常情況以及處理策略如下:

  • 處理消息失敗:自動觸發重試,重試到用戶設置的最大次數或預設值後會將消息失敗狀態通知State Manager。

  • 處理消息超時:超時對於吞吐影響較大,且通常重試的效果不明顯,因此當前策略是不會對消息重試,直接通知State Manager 消息處理失敗。

  • 處理消息較慢:上游Topic存在Lag,Message Consumer消費速率大於Message Processor處理速率時,消息會堆積在隊列中,達到隊列最大長度,Message Consumer 會被阻塞在入隊操作,停止拉取消息,類似Flink框架中的背壓。

監控

為了方便運維,在框架層面暴露了一組監控指標,並支持用戶自定義Metrics。其中預設支持的Metrics如下表所示:

線上運維Case舉例

實際生產環境運行時,偶爾需要做些運維操作,其中最常見的是消息堆積和消息重放。

  1. 對於Conusmer Lag這類問題的處理步驟大致如下:

  • 查看Enqueue Time,Queue Length的監控確定服務內隊列是否有堆積。

  • 如果隊列有堆積,查看Process Time指標,確定是否是某個Processor處理慢,如果是,根據指標中的Tag 確定事件類型等屬性特征,判斷業務邏輯或者Key設置是否合理;全部Processor 處理慢,可以通過增加Processor並行度來解決。

  • 如果隊列無堆積,排除網路問題後,可以考慮增加Consumer並行度至Topic Partition 上限。

消息重放被觸發的原因通常有兩種,要麼是業務上需要重放部分數據做補全,要麼是遇到了事故需要修複數據。為了應對這種需求,我們在框架層面支持了根據時間戳重置Offset的能力。具體操作時的步驟如下:

  • 使用服務測暴露的API,啟動一臺實例使用新的Consumer GroupId: {newConsumerGroup} 從某個startupTimestamp開始消費

  • 更改全部配置中的 Consumer GroupId 為 {newConsumerGroup}

  • 分批重啟所有實例

總結

為瞭解決位元組數據中台DataLeap中Data Catalog系統消費近實時元數據變更的業務場景,我們自研了輕量級消息處理框架。當前該框架已在位元組內部生產環境穩定運行超過1年,並支持了火山引擎上的數據地圖服務的元數據同步場景,滿足了我們團隊的需求。

下一步會根據優先順序排期支持RocketMQ等其他消息隊列,並持續優化配置動態更新,監控報警,運維自動化等方面。

 

立即跳轉火山引擎大數據研發治理套件DataLeap官網瞭解詳情
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 上課筆記 文件系統結構 /根目錄 /bin/ 存放系統命令,普通用戶與root都可以執行 /etc/ 配置文件保存位置 /lib/ 系統調用的函數庫保存位置 /var/ 目錄用於存儲動態數據,例如緩存、日誌文件、軟體運行過程中產生的文件等 /home/ 普通用戶目錄 /proc/ 配置文件目錄 /r ...
  • 摘要:一鍵創建實驗環境,開發者通過實驗手冊指導,快速體驗華為雲IoT服務,在雲端即可實現雲服務的實踐、調測和驗證等開發流程。 本文分享自華為雲社區《物聯網雲上實驗上新,帶您深度體驗華為雲IoT服務》,作者:華為IoT雲服務。 華為雲IoT沙箱實驗新品上線,誠邀廣大開發者參與體驗。一鍵創建實驗環境,開 ...
  • EndNote X9 for Mac是一款非常值得推薦的文獻管理軟體,不僅可以讓您免於手動收集和整理您的研究資料和格式化書目的繁瑣工作,還可以讓您在與同事協調時更加輕鬆自如。讓你的團隊專註科研,更高效的共用文獻開展協作。 詳情:EndNote X9 for Mac(最好用的文獻管理軟體) 引文報告 ...
  • 提起標準 IO 庫,第一印象就是 printf/scanf,這有什麼可說的?但是一個流是如何處理寬窄字元集、緩存方式的?如何在程式內部將標準輸出重定向到文件?FILE* 與 fd 是如何相互轉換的?在處理大文件時 fseek/fseeko/fsetpos 有何區別?創建臨時文件時 tmpnam/te... ...
  • RedisInsight 是一個直觀而高效的 Redis GUI (可視化工具),它提供了設計、開發和優化 Redis 應用程式的功能,查詢、分析您的 Redis 數據並與之交互,簡化您的 Redis 應用程式開發 。(必備) RedisInsight 現在採用了基於流行的 Electron 框架的... ...
  • 2022-09-20 Redis——select Redis資料庫中的資料庫的個數為: 16個,使用0號資料庫開始的,到第15個資料庫結束。 在ubantu中,進入Redis客戶端的命令: redis-cli 在ubantu中,如果選擇第0-15個資料庫中的一個的命令,例如第0個: select 0 ...
  • 一、直播介紹 前幾期,我們為大家分享了ChunJun的數據還原、Hive事務表及傳輸模塊的一些內容,本期我們為大家分享ChunJun類載入原理與實現。 本次直播我們將從Java 類載入器解決類衝突基本思想、Flink 類載入器隔離的方案、ChunJun如何實現類載入器隔離及問題排查等方面為大家進行介 ...
  • 1.創建容器併進行持久化處理 #拉取鏡像 docker pull mysql:8.0.20 #啟動鏡像,用於拷貝配置文件到宿主機 docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.20 #查 ...
一周排行
    -Advertisement-
    Play Games
  • 一、openKylin簡介 openKylin(開放麒麟) 社區是在開源、自願、平等和協作的基礎上,由基礎軟硬體企業、非營利性組織、社團組織、高等院校、科研機構和個人開發者共同創立的一個開源社區,致力於通過開源、開放的社區合作,構建桌面操作系統開源社區,推動Linux開源技術及其軟硬體生態繁榮發展。 ...
  • 簡介 Flurl是一個用於構建基於HTTP請求的C#代碼的庫。它的主要目的是簡化和優雅地處理網路請求(只用很少的代碼完成請求)。Flurl提供了一種簡單的方法來構建GET、POST、PUT等類型的請求,以及處理響應和異常。它還提供了一些高級功能,如鏈式調用、緩存請求結果、自動重定向等。本文將介紹Fl ...
  • 一:背景 1. 講故事 最近也挺奇怪,看到了兩起 CPU 爆高的案例,且誘因也是一致的,覺得有一些代表性,合併分享出來幫助大家來避坑吧,閑話不多說,直接上 windbg 分析。 二:WinDbg 分析 1. CPU 真的爆高嗎 這裡要提醒一下,別人說爆高不一定真的就是爆高,我們一定要拿數據說話,可以 ...
  • 剛開始寫文章,封裝Base基類的時候,添加了trycatch異常塊,不過當時沒有去記錄日誌,直接return了。有小伙伴勸我不要吃了Exception 其實沒有啦,項目剛開始,我覺得先做好整體結構比較好。像是蓋樓一樣。先把樓體建造出來,然後再一步一步的美化完善。 基礎的倉儲模式已經ok,Autofa ...
  • 框架目標 什麼是框架,框架能做到什麼? 把一個方向的技術研發做封裝,具備通用性,讓使用框架的開發者用起來很輕鬆。 屬性: 通用性 健壯性 穩定性 擴展性 高性能 組件化 跨平臺 從零開始-搭建框架 建立項目 主鍵查詢功能開發 綁定實體 一步一步的給大家推導: 一邊寫一邊測試 從零開始--搭建框架 1 ...
  • 大家好,我是沙漠盡頭的狼。 本方首發於Dotnet9,介紹使用dnSpy調試第三方.NET庫源碼,行文目錄: 安裝dnSpy 編寫示常式序 調試示常式序 調試.NET庫原生方法 總結 1. 安裝dnSpy dnSpy是一款功能強大的.NET程式反編譯工具,可以對.NET程式進行反編譯,代替庫文檔的功 ...
  • 在`Windows`操作系統中,每個進程的虛擬地址空間都被劃分為若幹記憶體塊,每個記憶體塊都具有一些屬性,如記憶體大小、保護模式、類型等。這些屬性可以通過`VirtualQueryEx`函數查詢得到。該函數可用於查詢進程虛擬地址空間中的記憶體信息的函數。它的作用類似於`Windows`操作系統中的`Task... ...
  • 背景介紹 1,最近有一個大數據量插入的操作入庫的業務場景,需要先做一些其他修改操作,然後在執行插入操作,由於插入數據可能會很多,用到多線程去拆分數據並行處理來提高響應時間,如果有一個線程執行失敗,則全部回滾。 2,在spring中可以使用@Transactional註解去控制事務,使出現異常時會進行 ...
  • 線程(thread)是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際 運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線 程並行執行不同的任務。 ...
  • 發現Java 21的StringBuilder和StringBuffer中多了repeat方法: /** * @throws IllegalArgumentException {@inheritDoc} * * @since 21 */ @Override public StringBuilder ...