數據開發基本都是從陌生到熟悉,但是寫多了就會發現各種好用的工具/函數,也會發現各種坑,本文分享了作者從拿到數據到數據開發到數據監控的一些實操經驗。 ...
數據開發基本都是從陌生到熟悉,但是寫多了就會發現各種好用的工具/函數,也會發現各種坑,本文分享了作者從拿到數據到數據開發到數據監控的一些實操經驗。
寫在前面
各種join的用法篇
|
|
|
|
|
|
|
|
|
|
|
|
read zhule_b;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- better way to perform join, select small range of data first.
SELECT A.*, B.*
FROM
(SELECT * FROM A WHERE ds='20180101') A
JOIN
(SELECT * FROM B WHERE ds='20180101') B
ON a.key = b.key;
-
The left table in a LEFT OUTER JOIN operation must be a large table.
-
The right table in a RIGHT OUTER JOIN operation must be a large table.
-
MAPJOIN cannot be used in a FULL OUTER JOIN operation.
-
The left or right table in an INNER JOIN operation can be a large table.
SELECT /*+ MAPJOIN(b) */
a.*
FROM test_a a
JOIN test_b b
ON a.user_key = b.user_key
;
//就是在sql語句前加一個標記說這是mapjoin,把小表別名寫在括弧里
Left Join
-
一定要保留左表的內容是,可以選擇用left join,例如加入key_attrs
-
Right Join和Left Join沒有本質區別,建議定義好左表後都利用Left Join來執行
-
如果右表有重覆數據的情況,那麼最終left join的結果會有重覆
Left Semi Join
-
當右表沒有重覆數據時,和Join是一致的,只會保留相同的列下來
-
left semi join並不會返回右表B中的任何數據,所以你沒法在where條件中指定關於右表B的任何篩選條件,下麵得例子能夠有更加清晰的對比(例子引用於開源論壇):
employee (2 columns - e_id and e_name)
10, Tom
20, Jerry
30, Emily
employee_department_mapping (2 columns - e_id and d_name)
10, IT
10, Sales
20, HR
-- inner join results
SELECT e.e_id, e.e_name, d.d_name FROM
employee e INNER JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
10, Tom, Sales
20, Jerry, HR
-- left semi join results
SELECT e.e_id, e.e_name, d.d_name FROM
employee e LEFT SEMI JOIN employee_department_mapping d
on e.e_id = d.e_id
-- results
10, Tom, IT
20, Jerry, HR
-
最好用的場景就是找出兩表的差異部分;
-
演算法日常調度時可以用於每日新增修改商品的提取,將關鍵欄位放到ON條件中就行
Full Join
-
在有增刪改情況下更新下游最新數據時,非常好用,但是知道的人比較少
SELECT COALESCE(a.main_image_url,b.main_image_url) AS main_image_url
,COALESCE(a.embedding,b.embedding) AS embedding
FROM today_feat a
FULL JOIN yest_feat b
ON a.main_image_url = b.main_image_url
其中full jion的效果如下,正好滿足new,old,updated feature的更新,配合COALESCE很絲滑:
合理設置Mapper和Reducepriority
set odps.instance.priority
set odps.sql.mapper.split.size
-- original sql
CREATE TABLE if not EXISTS tmp_zhl_test LIFECYCLE 1 AS
SELECT sig, venture, seller_id, COUNT(product_id) as cnt
FROM sku_main_image_sig
WHERE LENGTH(sig) > 10 --some bad cases may have weird sigs like '#NEXT#'
GROUP BY sig, venture, seller_id
HAVING cnt>2
;
set odps.sql.reducer.instances
set odps.sql.mapper(reducer).memory
在Python UDF中使用第三方庫
Upload&Call Package
-
需要下載第三方庫的安裝包xxx.whl,可以直接下載到自己的電腦上面,這樣可以在離線環境驗證多個版本的一致性(下麵介紹)。一般來說我們需要去看安裝包需要的python版本號以及相容機器環境,一般來說都是cp37-cp37m or py2.py3-none-any在中間,然後末尾是x86_64的安裝包;
-
本地直接將xxx.whl轉換為xxx.zip,利用命令「mv xxx.whl xxx.zip」就行
-
將zip資源文件上傳到ODPS對應的環境
-
在你的UDF中,利用下麵的代碼指定資源包的路徑和引用(直接copy就行)
def include_package_path(res_name, lib_name):
archive_files = get_cache_archive(res_name)
dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
if '.dist_info' not in f.name], key=lambda v: len(v))
for dir_name in dir_names:
if dir_name.endswith(lib_name):
sys.path.insert(0, os.path.dirname(dir_name))
break
elif os.path.exists(os.path.join(dir_name, lib_name + '.py')):
sys.path.insert(0, os.path.abspath(dir_name))
break
class PostProcess(BaseUDTF):
def __init__(self):
include_package_path('opencv_python-3.4.0.zip','cv2')
include_package_path('numpy-1.16.6.zip','numpy')
-
python UDF寫完後,就可以在創建函數裡面的Resources里直接將你的資源名寫進去,這樣在流程啟動後,你的資源才會被有效調用起來。
-
python UDF預設的版本是2.x的,如果你的python版本是3.x,那麼需要在ODPS運行前加入下麵的指令;同時,部分功能是需要打開沙箱的,所以如果報錯的話,可以加入第二行的沙箱命令。
set odps.sql.python.version = cp37; --use python 3.7, default is 2.x version
set odps.isolation.session.enable = true;
Solve Compatibility Issue
-
在本地可以用類似conda的工具搭建一個虛擬環境
-
在本地用pip或者conda install來安裝你需要的三方庫
-
查詢你下載的三方庫以及依賴庫的版本,比如python-opencv的話可以列印cv2.__version__
-
把對應版本的xxx.whl包按照上面的方法現在下來並且上傳到ODPS資源中進行依賴
發佈任務時的一些額外建議
-
發佈任務配置時,可以靈活使用exclude和extra來去掉或添加你想要的依賴。其中exclude可以去掉你中間產出的臨時表,而extra可以幫你增加雖然不在代碼里但是希望依賴的上游表(這在彙總不同上游表數據寫入下游對應分區並且希望同時產出下游數據時很有用)。
--exclude input or output tables (especially those tmp tables)--@exclude_input=lsiqg_iqc_sku_product_detection_result--@exclude_output=lsmp_sku_image_url_bizdate
-- include input or output tables (especially those separate venture tables)--@extra_input=lsiqg_iqc_sku_product_detection_result--@extra_output=lsmp_sku_image_url_bizdate
-
如果在SQL代碼過程中你需要使用臨時表來過渡中間產出的數據(避免SQL嵌套過於嚴重,影響運行效率),建議一定在臨時表中加入一個時間戳,ex. lsiqg_iqc_input_tmp_${bizdate}不然在補數據或者遇到任務堵塞兩個任務同時在調度時,或產生overwrite的一系列問題。 -
如果存在上游表存在多個分區,但是每個分區處理邏輯一樣的話(比如不同國家的分區表處理邏輯其實一樣),非常建議在第一步里就把不同分區表的數據彙總起來,可以重新增加一個分區(如venture)來存放融合後的數據。如下示例:
INSERT OVERWRITE TABLE sku_main_image_sig PARTITION (ds = '${bizdate}',venture)
SELECT id
,image_url
,venture
FROM (
SELECT id
,image_url
,'ID' AS venture
FROM auction_image_id
WHERE ds = MAX_PT('auction_image_id')
UNION
SELECT id
,image_url
,'PH' AS venture
FROM auction_image_ph
WHERE ds = MAX_PT('auction_image_ph')
UNION
SELECT id
,image_url
,'VN' AS venture
FROM auction_image_vn
WHERE ds = MAX_PT('auction_image_vn')
UNION
SELECT id
,image_url
,'SG' AS venture
FROM auction_image_sg
WHERE ds = MAX_PT('auction_image_sg')
UNION
SELECT id
,image_url
,'MY' AS venture
FROM auction_image_my
WHERE ds = MAX_PT('auction_image_my')
UNION
SELECT id
,image_url
,'TH' AS venture
FROM auction_image_th
WHERE ds = MAX_PT('auction_image_th')
) union_table
;
-
對於重要的數據表,一定要設置監控,防止數據丟失、不正常產出等問題,具體的方法又可以分兩類:
-
設置任務基線(baseline)來保證任務優先順序,這樣調度的時間更有保障
-
設置warning的簡訊/電話或者DQC的監控規則來具體監控數據
簡單的任務可以直接在任務中心查看詳情中設置:
寫在最後
-
拿到數據第一時間驗證數據的重覆性,有效性;如果是組內問題就反饋,上游鏈路問題就自己過濾;
-
寫完數據的每一部分都先驗證合理性,這樣會提高最終數據的成功率;
-
一般節點上線後,會主動去觀察3-4天,確保輸出是符合預期的(如果會發現應該穩定的數據反而猛然增加or減少,那很可能是數據邏輯有問題);
-
定義合理的數據監控,可以避免數據為空,數據波動過大,數據欄位不合理等問題;
Enjoy Data Engineering!!
作者|周慧玲(逐樂)
本文來自博客園,作者:古道輕風,轉載請註明原文鏈接:https://www.cnblogs.com/88223100/p/ODPS-optimization-suggestions-for-technical-newcomers.html