在之前介紹過數據倉庫中的歷史拉鏈表《極限存儲–歷史拉鏈表》, 使用這種方式即可以記錄歷史,而且最大程度的節省存儲。這裡簡單介紹一下這種歷史拉鏈表的更新方法。 本文中假設: CREATE TABLE orders ( orderid INT, createtime STRING, modifiedti ...
在之前介紹過數據倉庫中的歷史拉鏈表《極限存儲–歷史拉鏈表》,
使用這種方式即可以記錄歷史,而且最大程度的節省存儲。這裡簡單介紹一下這種歷史拉鏈表的更新方法。
本文中假設:
- 數據倉庫中訂單歷史表的刷新頻率為一天,當天更新前一天的增量數據;
- 如果一個訂單在一天內有多次狀態變化,則只會記錄最後一個狀態的歷史;
- 訂單狀態包括三個:創建、支付、完成;
- 創建時間和修改時間只取到天,如果源訂單表中沒有狀態修改時間,那麼抽取增量就比較麻煩,需要有個機制來確保能抽取到每天的增量數據;
- 本文中的表和SQL都使用Hive的HQL語法;
- 源系統中訂單表結構為:
CREATE TABLE orders (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING
) stored AS textfile;
7.在數據倉庫的ODS層,有一張訂單的增量數據表,按天分區,存放每天的增量數據:
CREATE TABLE t_ods_orders_inc (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING
) PARTITIONED BY (day STRING)
stored AS textfile;
8. 在數據倉庫的DW層,有一張訂單的歷史數據拉鏈表,存放訂單的歷史狀態數據:
CREATE TABLE t_dw_orders_his (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING,
dw_start_date STRING,
dw_end_date STRING
) stored AS textfile;
9. 暫未考慮Hive上表的查詢性能問題,只實現功能;
10. 2015-08-21至2015-08-23,每天原系統訂單表的數據如下,紅色標出的為當天發生變化的訂單,即增量數據:
全量初始化
在數據從源業務系統每天正常抽取和刷新到DW訂單歷史表之前,需要做一次全量的初始化,就是從源訂單表中昨天以前的數據全部抽取到ODW,並刷新到DW。
以上面的數據為例,比如在2015-08-21這天做全量初始化,那麼我需要將包括2015-08-20之前的所有的數據都抽取並刷新到DW:
第一步,抽取全量數據到ODS:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-20′)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime <= ‘2015-08-20′;
第二步,從ODS刷新到DW:
INSERT overwrite TABLE t_dw_orders_his
SELECT orderid,createtime,modifiedtime,status,
createtime AS dw_start_date,
‘9999-12-31′ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-20′;
完成後,DW訂單歷史表中數據如下:
- spark-sql> select * from t_dw_orders_his;
- 1 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
- 2 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
- 3 2015-08-19 2015-08-21 支付 2015-08-19 9999-12-31
- 4 2015-08-19 2015-08-21 完成 2015-08-19 9999-12-31
- 5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
- 6 2015-08-20 2015-08-20 創建 2015-08-20 9999-12-31
- 7 2015-08-20 2015-08-21 支付 2015-08-20 9999-12-31
- Time taken: 2.296 seconds, Fetched 7 row(s)
增量抽取
每天,從源系統訂單表中,將前一天的增量數據抽取到ODS層的增量數據表。
這裡的增量需要通過訂單表中的創建時間和修改時間來確定:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘${day}‘)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime = ‘${day}’ OR modifiedtime = ‘${day}';
註意:在ODS層按天分區的增量表,最好保留一段時間的數據,比如半年,為了防止某一天的數據有問題而回滾重做數據。
增量刷新曆史數據
從2015-08-22開始,需要每天正常刷新前一天(2015-08-21)的增量數據到歷史表。
第一步,通過增量抽取,將2015-08-21的數據抽取到ODS:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-21′)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime = ‘2015-08-21′ OR modifiedtime = ‘2015-08-21′;
ODS增量表中2015-08-21的數據如下:
- spark-sql> select * from t_ods_orders_inc where day = '2015-08-21';
- 3 2015-08-19 2015-08-21 支付 2015-08-21
- 4 2015-08-19 2015-08-21 完成 2015-08-21
- 7 2015-08-20 2015-08-21 支付 2015-08-21
- 8 2015-08-21 2015-08-21 創建 2015-08-21
- Time taken: 0.437 seconds, Fetched 4 row(s)
第二步,通過DW歷史數據(數據日期為2015-08-20),和ODS增量數據(2015-08-21),刷新曆史表:
先把數據放到一張臨時表中:
- DROP TABLE IF EXISTS t_dw_orders_his_tmp;
- CREATE TABLE t_dw_orders_his_tmp AS
- SELECT orderid,
- createtime,
- modifiedtime,
- status,
- dw_start_date,
- dw_end_date
- FROM (
- SELECT a.orderid,
- a.createtime,
- a.modifiedtime,
- a.status,
- a.dw_start_date,
- CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > '2015-08-21' THEN '2015-08-20' ELSE a.dw_end_date END AS dw_end_date
- FROM t_dw_orders_his a
- left outer join (SELECT * FROM t_ods_orders_inc WHERE day = '2015-08-21') b
- ON (a.orderid = b.orderid)
- UNION ALL
- SELECT orderid,
- createtime,
- modifiedtime,
- status,
- modifiedtime AS dw_start_date,
- '9999-12-31' AS dw_end_date
- FROM t_ods_orders_inc
- WHERE day = '2015-08-21'
- ) x
- ORDER BY orderid,dw_start_date;
其中:
UNION ALL的兩個結果集中,第一個是用歷史表left outer join 日期為 ${yyy-MM-dd} 的增量,能關聯上的,並且dw_end_date > ${yyy-MM-dd},說明狀態有變化,則把原來的dw_end_date置為(${yyy-MM-dd} – 1), 關聯不上的,說明狀態無變化,dw_end_date無變化。
第二個結果集是直接將增量數據插入歷史表。
最後把臨時表中數據插入歷史表:
INSERT overwrite TABLE t_dw_orders_his
SELECT * FROM t_dw_orders_his_tmp;
刷新完後,歷史表中數據如下
- spark-sql> select * from t_dw_orders_his order by orderid,dw_start_date;
- 1 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
- 2 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
- 3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
- 3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
- 4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
- 4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
- 5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
- 6 2015-08-20 2015-08-20 創建 2015-08-20 9999-12-31
- 7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
- 7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
- 8 2015-08-21 2015-08-21 創建 2015-08-21 9999-12-31
- Time taken: 0.717 seconds, Fetched 11 row(s)
由於在2015-08-21做了8月20日以前的數據全量初始化,而訂單3、4、7在2015-08-21的增量數據中也存在,因此都有兩條記錄,但不影響後面的查詢。
再看將2015-08-22的增量數據刷新到歷史表:
- INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = '2015-08-22')
- SELECT orderid,createtime,modifiedtime,status
- FROM orders
- WHERE createtime = '2015-08-22' OR modifiedtime = '2015-08-22';
- DROP TABLE IF EXISTS t_dw_orders_his_tmp;
- CREATE TABLE t_dw_orders_his_tmp AS
- SELECT orderid,
- createtime,
- modifiedtime,
- status,
- dw_start_date,
- dw_end_date
- FROM (
- SELECT a.orderid,
- a.createtime,
- a.modifiedtime,
- a.status,
- a.dw_start_date,
- CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > '2015-08-22' THEN '2015-08-21' ELSE a.dw_end_date END AS dw_end_date
- FROM t_dw_orders_his a
- left outer join (SELECT * FROM t_ods_orders_inc WHERE day = '2015-08-22') b
- ON (a.orderid = b.orderid)
- UNION ALL
- SELECT orderid,
- createtime,
- modifiedtime,
- status,
- modifiedtime AS dw_start_date,
- '9999-12-31' AS dw_end_date
- FROM t_ods_orders_inc
- WHERE day = '2015-08-22'
- ) x
- ORDER BY orderid,dw_start_date;
- INSERT overwrite TABLE t_dw_orders_his
- SELECT * FROM t_dw_orders_his_tmp;
刷新完後歷史表數據如下:
- spark-sql> select * from t_dw_orders_his order by orderid,dw_start_date;
- 1 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
- 1 2015-08-18 2015-08-22 支付 2015-08-22 9999-12-31
- 2 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
- 2 2015-08-18 2015-08-22 完成 2015-08-22 9999-12-31
- 3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
- 3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
- 4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
- 4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
- 5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
- 6 2015-08-20 2015-08-20 創建 2015-08-20 2015-08-21
- 6 2015-08-20 2015-08-22 支付 2015-08-22 9999-12-31
- 7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
- 7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
- 8 2015-08-21 2015-08-21 創建 2015-08-21 2015-08-21
- 8 2015-08-21 2015-08-22 支付 2015-08-22 9999-12-31
- 9 2015-08-22 2015-08-22 創建 2015-08-22 9999-12-31
- 10 2015-08-22 2015-08-22 支付 2015-08-22 9999-12-31
- Time taken: 0.66 seconds, Fetched 17 row(s)
查看2015-08-21的歷史快照數據:
- spark-sql> select * from t_dw_orders_his where dw_start_date <= '2015-08-21' and dw_end_date >= '2015-08-21';
- 1 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
- 2 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
- 3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
- 4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
- 5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
- 6 2015-08-20 2015-08-20 創建 2015-08-20 2015-08-21
- 7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
- 8 2015-08-21 2015-08-21 創建 2015-08-21 2015-08-21
訂單1在2015-08-21的時候還處於創建的狀態,在2015-08-22的時候狀態變為支付。
再刷新2015-08-23的增量數據:
按照上面的方法刷新完後,歷史表數據如下:
- spark-sql> select * from t_dw_orders_his order by orderid,dw_start_date;
- 1 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
- 1 2015-08-18 2015-08-22 支付 2015-08-22 2015-08-22
- 1 2015-08-18 2015-08-23 完成 2015-08-23 9999-12-31
- 2 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
- 2 2015-08-18 2015-08-22 完成 2015-08-22 9999-12-31
- 3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
- 3 2015-08-19 2015-08-21 支付 2015-08-21 2015-08-22
- 3