ETL的架構 ETL架構的優勢: ETL相對於EL-T架構可以實現更為複雜的數據轉化邏輯 ETL採用單獨的硬體伺服器,可以分擔資料庫系統的負載 ETL與底層的資料庫數據存儲無關,可以保持所有的數據始終在資料庫當中,避免數據的載入和導出,從而保證效率,提高系統的可監控性。 ELT主要通過資料庫引擎來實 ...
目錄
ETL的架構
ETL架構的優勢:
ETL相對於EL-T架構可以實現更為複雜的數據轉化邏輯
ETL採用單獨的硬體伺服器,可以分擔資料庫系統的負載
ETL與底層的資料庫數據存儲無關,可以保持所有的數據始終在資料庫當中,避免數據的載入和導出,從而保證效率,提高系統的可監控性。
ELT主要通過資料庫引擎來實現系統的可擴展性(尤其是當數據加工過程在晚上時,可以充分利用資料庫引擎的資源)
ELT可以根據數據的分佈情況進行並行處理優化,並可以利用資料庫的固有功能優化磁碟I/O。
ELT的可擴展性取決於資料庫引擎和其硬體伺服器的可擴展性。
通過對相關資料庫進行性能調優,ETL過程獲得3到4倍的效率提升一般不是特別困難。
離線 ETL 的架構設計
離線 ETL 採用 MapReduce 框架處理清洗不同業務的數據,主要是採用了分而治之的思想,能夠水平擴展數據清洗的能力;
graph LR 1[Input] --> 2[Map] --> 3[Output]如上圖所示,離線 ETL 分為三個模塊:
- Input(InputFormat):主要對數據來源(Kafka 數據)進行解析分片,按照一定策略分配到不同的 Map 進程處理;創建 RecordReader,用於對分片數據讀取解析,生成 key-value 傳送給下游處理。
- Map(Mapper):對 key-value 數據進行加工處理。
- Output (OutputFormat):創建 RecordWriter 將處理過的 key-value 數據按照庫、表、分區落地;最後在 commit 階段檢測消息處理的完整性。
離線 ETL 的模塊實現
數據分片(Split)
我們從 kafka 獲取當前 topic&partition 最大的 offset 以及上次消費的截止 offset ,組成本次要消費的[beginOffset、endOffset]kafkaEvent,kafkaEvent 會打散到各個 Mapper 進行處理,最終這些 offset 信息持久化到 mysql 表中。
那麼如何保證數據不傾斜呢?首先通過配置自定義 mapper 個數,並創建對應個數的 ETLSplit。由於 kafkaEevent 包含了單個 topic&partition 之前消費的 Offset 以及將要消費的最大 Offset,即可獲得每個 kafkaEvent 需要消費的消息總量。最後遍歷所有的 kafkaEevent,將當前 kafkaEevent 加入當前最小的 ETLSplit(通過比較需要消費的數據量總和,即可得出),通過這樣生成的 ETLSplit 能儘量保證數據均衡。
數據解析清洗(Read)
如上圖所示,首先每個分片會有對應的 RecordReader 去解析,RecordReade 內包含多個 KafkaConsumerReader ,就是對每個 KafkaEevent 進行消費。每個 KafkaEevent 會對應一個 KafkaConsumer,拉取了位元組數據消息之後需要對此進行 decode 反序列化,此時就涉及到 MessageDecoder 的結構。MessageDecoder 目前支持三種格式:
格式 | 涉及 topic |
---|---|
Avro | android、ios、ad_sdk_android... |
Json | app-server-meipai、anti-spam... |
DelimiterText | app-server-youyan、app-server-youyan-im... |
MessageDecoder 接收到 Kafka 的 key 和 value 時會對它們進行反序列化,最後生成 ETLKey 和 ETLValue。同時 MessageDecoder 內包含了 Injector,它主要做瞭如下事情:
- 註入 Aid:針對 arachnia agent 採集的日誌數據,解析 KafkaKey 註入日誌唯一標識 Aid;
- 註入 GeoIP 信息:根據 GeoIP 解析 ip 信息註入地理信息(如 country_id、province_id、city_id);
- 註入 SdkDeviceInfo: 本身實時流 ETL 會做註入 gid、is_app_new 等信息,但是離線 ETL 檢測這些信息是否完整,做進一步保障。
過程中還有涉及到 DebugFilter,它將 SDK 調試設備的日誌過濾,不落地到 HDFS。
多文件落地(Write)
由於 MapReduce 本身的 RecordWriter 不支持單個落地多個文件,需要特殊處理,並且 HDFS 文件是不支持多個進程(線程)writer、append,於是我們將KafkaKey+ 業務分區+ 時間分區 + Kafka partition定義一個唯一的文件,每個文件都是會到帶上 kafka partition 信息。同時對每個文件創建一個RecordWriter。
每個 RecordWriter 包含多個 Writer ,每個 Writer 對應一個文件,這樣可以避免同一個文件多線程讀寫。目前是通過 guava cache 維護 writer 的數量,如果 writer 太多或者太長時間沒有寫訪問就會觸發 close 動作,待下批有對應目錄的 kafka 消息在創建 writer 進行 append 操作。這樣我們可以做到在同一個 map 內對多個文件進行寫入追加。
檢測數據消費完整性 (Commit)
MapReduce Counter 為提供我們一個視窗,觀察統計 MapReduce job 運行期的各種細節數據。並且它自帶了許多預設 Counter,可以檢測數據是否完整消費:
reader_records: 解析成功的消息條數;
decode_records_error: 解析失敗的消息條數;
writer_records: 寫入成功的消息條數;
...
最後通過本次要消費 topic offset 數量、reader_records 以及 writer_records 數量是否一致,來確認消息消費是否完整。
允許一定比例的臟數據,若超出限度會生成簡訊告警
參考鏈接
https://blog.csdn.net/javastart/article/details/113838240