我們前面採集的日誌數據已經保存到 Kafka 中,作為日誌數據的 ODS 層,從 Kafka 的ODS 層讀取的日誌數據分為 3 類, 頁面日誌、啟動日誌和曝光日誌。這三類數據雖然都是用戶行為數據,但是有著完全不一樣的數據結構,所以要拆分處理。將拆分後的不同的日誌寫回 Kafka 不同主題中,作為日 ...
我們前面採集的日誌數據已經保存到 Kafka 中,作為日誌數據的 ODS 層,從 Kafka 的ODS 層讀取的日誌數據分為 3 類, 頁面日誌、啟動日誌和曝光日誌。這三類數據雖然都是用戶行為數據,但是有著完全不一樣的數據結構,所以要拆分處理。將拆分後的不同的日誌寫回 Kafka 不同主題中,作為日誌 DWD 層。
流頁面日誌輸出到主流,啟動日誌輸出到啟動側輸出流,曝光日誌輸出到曝光側輸出流
識別新老用戶
本身客戶端業務有新老用戶的標識,但是不夠準確,需要用實時計算再次確認(不涉及業務操作,只是單純的做個狀態確認)。
啟動日誌
曝光日誌
頁面日誌
實現邏輯
- 獲取執行環境
- 消費 ods_base_log 主題數據創建流
- 將每行數據轉換為JSON對象(臟數據寫到側輸出流)
- 新老用戶校驗 狀態編程
- 分流 側輸出流 頁面:主流 啟動:側輸出流 曝光:側輸出流
- 提取側輸出流
- 將三個流進行列印並輸出到對應的Kafka主題中
- 啟動任務
# 啟動三個消費者,分別消費 dwd_start_log、dwd_page_log、dwd_display_log 主題
$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_page_log