我的Spark SQL單元測試實踐

来源:https://www.cnblogs.com/hhelibeb/archive/2019/03/21/10534862.html
-Advertisement-
Play Games

最近加入一個Spark項目,作為臨時的開發人員協助進行開發工作。該項目中不存在測試的概念,開發人員按需求進行編碼工作後,直接向生產系統部署,再由需求的提出者在生產系統檢驗程式運行結果的正確性。在這種原始的工作方式下,產品經理和開發人員總是在生產系統驗證自己的需求、代碼。可以想見,各種直接交給用戶的錯 ...


最近加入一個Spark項目,作為臨時的開發人員協助進行開發工作。該項目中不存在測試的概念,開發人員按需求進行編碼工作後,直接向生產系統部署,再由需求的提出者在生產系統檢驗程式運行結果的正確性。在這種原始的工作方式下,產品經理和開發人員總是在生產系統驗證自己的需求、代碼。可以想見,各種直接交給用戶的錯誤導致了一系列的事故和不信任。為了處理各類線上問題,大家都疲於奔命。當工作進行到後期,每一個相關人都已經意氣消沉,常常對工作避之不及。

為了改善局面,我嘗試了重構部分代碼,將連篇的SQL分散到不同的方法里,並對單個方法構建單元測試。目的是,在編碼完成後,首先在本地執行單元測試,以實現:

  1. 部署到生產系統的代碼中無SQL語法錯誤。
  2. 將已出現的bug寫入測試用例,避免反覆出現相同的bug。
  3. 提前發現一些錯誤,減少影響到後續環節的問題。
  4. 通過自動化減少開發和處理程式問題的總時間花費。
  5. 通過流程和結果的改善,減少開發人員的思維負擔,增加與其他相關人的互信。

本文將介紹我的Spark單元測試實踐,供大家參考、批評。本文中的Spark API是PySpark,測試框架為pytest。

對於希望將本文當作單元測試教程使用的讀者,本文會假定讀者已經準備好了開發和測試所需要的環境,如果沒有也沒有關係,文末的參考部分會包含一些配置環境相關的鏈接。

 

本文鏈接:https://www.cnblogs.com/hhelibeb/p/10534862.html

轉載請註明

概念

定義

單元測試是一種測試方法,它的對象是單個程式單元/組件,目的是驗證軟體的每個組件都符合設計要求。

單元是軟體中最小的可測試部分。它通常包含一些輸入和單一的輸出。

本文中的單元就是python函數(function)。

單元測試通常是程式開發人員的工作。

原則

為了實現單元測試,函數最好符合一個條件,

  • 對於相同的輸入,函數總有相同的輸出。

這要求函數內部不能存在“副作用”。

它的輸出結果的確定不應該依賴輸入參數外的任何內容,例如,不可以因為本地測試環境中沒有相應的資料庫就產生“連接資料庫異常”導致無法返回結果。

它也不應該改變除了返回結果以外的任何內容,例如,不可以改變全局可變狀態。

代碼實踐

下麵是數據和程式部分。

數據

假設我們的服務對象是一家水果運銷公司,公司在不同城市設有倉庫,現有三張表,其中inventory包含水果的總庫存數量信息,inventory_ratio包含水果在不同城市的應有比例,

目標是根據總庫存數量和比例算出水果在各地的庫存,寫入到第三張表inventory_city中。三張表的列如下,

1. inventory. Columns: “item”, “qty”.
2. inventory_ratio. Columns: “item”, “city”, “ratio”.
3. inventory_city. Columns: “item”, “city”, “qty”.

第一版代碼

用最直接的方式實現這一功能,代碼將是,

from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

 

這段代碼可以實現計算各城市庫存的需求,但測試起來會不太容易。特別是如果未來我們還要在這個程式中增加其他邏輯的話,不同的邏輯混雜在一起,測試和修改都會變得麻煩。

所以,在下一步,我們要將部分代碼封裝到一個函數中。

有副作用的函數

創建一個名為get_inventory_city的函數,將代碼包含在內,

from pyspark.sql import SparkSession

def get_inventory_city():
    
    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
    
    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')
    result.write.csv(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()

顯然,這是一個不太易於測試的函數,因為它,

  • 沒有輸入輸出參數,不能直接根據給定數據檢驗運行結果。
  • 包含對資料庫的讀/寫,這意味著它要依賴外部資料庫。
  • 包含對spark session的獲取/創建,這和計算庫存的邏輯也毫無關係。

我們把這些函數中的多餘的東西稱為副作用。副作用和函數的核心邏輯糾纏在一起,使單元測試變得困難,也不利於代碼的模塊化。

我們必須另外管理副作用,只在函數內部保留純邏輯

無副作用的函數

按照上文中提到的原則,重新設計函數,可以得到,

from pyspark.sql import SparkSession, DataFrame

def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame):

    inventory.createOrReplaceTempView('v_inventory')
    ratio.createOrReplaceTempView('v_ratio')

    result = spark.sql('''select t1.item, t2.city,
                                 case when t2.ratio is not null then t1.qty * t2.ratio 
                                      else                           t1.qty
                                 end as qty     
                          from      v_inventory as t1
                          left join v_ratio     as t2 on t1.item = t2.item ''')

    return result

if __name__ == "__main__":

    spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

    inventory = spark.sql('''select * from inventory''')
    ratio     = spark.sql('''select * from inventory_ratio''')

    result = get_inventory_city(spark, inventory, ratio)

    result.write.csv(path="somepath/inventory_city", mode="overwrite")

修改後的函數get_inventory_city有3個輸入參數和1個返回參數,函數內部已經不再包含對spark session和資料庫表的處理,這意味著對於確定的輸入值,它總會輸出確定的結果。

這比之前的設計更加理想,因為函數只包含純邏輯,所以調用者使用它時不會再受到副作用的干擾,這使得函數的可測試性和可組合性得到了提高。

測試代碼

創建一個test_data目錄,將csv格式的測試數據保存到裡面。測試數據的來源可以是手工模擬製作,也可以是生產環境導出。

然後創建測試文件,添加代碼,

from inventory import get_inventory_city
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()

def test_get_inventory_city():

    #導入測試數據
    inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv")
    ratio     = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv")

    #執行函數
    result = get_inventory_city(spark, inventory, ratio)

    #驗證拆分後的總數量等於拆分前的總數量
    result.createOrReplaceTempView('v_result')
    inventory.createOrReplaceTempView('v_inventory')

    qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''')
    qty_after_split  = spark.sql('''select sum(qty) as qty from v_result''')

    assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']

執行測試,可以看到以下輸出內容

============================= test session starts =============================
platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item

test_get_inventory_city.py .2019-03-21 14:16:24 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
                                             [100%]
========================= 1 passed in 18.06 seconds ==========================

這樣一個單元測試例子就完成了。

相比把程式放到伺服器測試,單元測試的運行速度更快,開發者不用再擔心測試會對用戶造成影響,也可以更早發現在編碼期間犯下的錯誤。它也可以成為自動化測試的基礎。

待解決的問題

目前我已經可以在項目中構建初步的單元測試,但依然面臨著一些問題。

運行時間

上面這個簡單的測試示例在我的聯想T470筆記本上需要花費18.06秒執行完成,而實際項目中的程式的複雜度要更高,執行時間也更長。執行時間過長一件糟糕的事情,因為單元測試的執行花費越大,就會越被開發者拒斥。面對顯示器等待單元測試執行完成的時間是難捱的。雖然相比於把程式丟到生產系統中執行,單元測試已經可以節約不少時間,但還不夠好。

接下來可能會嘗試的解決辦法:提升電腦配置/改變測試數據的導入方式。

有效範圍

在生產實踐中構建純函數是一件不太容易的事情,它對開發者的設計和編碼能力有相當的要求。

單元測試雖然能幫助發現一些問題和確定問題代碼範圍,但它似乎並不能揭示錯誤的原因。

筆者水平有限,目前寫出的代碼中仍有不少單元測試力所不能及的地方。可能需要在實踐中對它們進行改進,或者引入其它測試手段作為補充。

參考

一些參考內容。

配置

Getting Started with PySpark on Windows

win10下安裝pyspark

PyCharm中的pytest

pycharm 配置spark 2.2.0

閱讀

函數響應式領域建模

ABAP單元測試最佳實踐

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、使用cat命令進行文件的縱向合併 1)掌握使用cat命令的縱向合併 a)例如:使用cat命令將test1、file1.txt和file2這三個文件縱向合併為file文件的命令為: cat test1 file1.txt file2>file b)例如:使用cat命令將file1.txt文件的內容 ...
  • 使用ssh登錄伺服器的時候,需要輸入ip地址、埠、用戶名、密碼等信息,比較麻煩,容易輸錯。還好,通過客戶端和伺服器的配置參數,可實現免密碼快速登錄。伺服器可通過保存客戶端的公鑰,用於驗證客戶端的身份,從而省去輸入密碼的步驟。客戶端也可通過配置伺服器參數來簡化登錄命令。本文主要是記錄了ssh面密碼快 ...
  • 在自己電腦上安裝一個mysql資料庫並啟動,碰到一些問題,總結一下 1、下載免安裝版mysql資料庫,百度下載了了5.7.25版本 2、在bin文件夾下找到my-defaults.ini文件,我這沒有,所以新建了一個my.ini文件 3、在文件里添加代碼 註:不要手動去創建data文件夾 4、安裝服 ...
  • 企業數據資產 有了大數據的光環,有了從數據中挖掘商業價值的方法和工具之後,那些原本存放在伺服器上平淡無奇的陳年舊數一夜之間身價倍增。按照世界經濟論壇報告的看法,“大數據為新財富,價值堪比石油"。《大數據時代》一書的作者維克托則樂觀地預測,數據列人企業資產負債表只是時間問題。 本質上,任何企業在生產活 ...
  • 一. 表關係 1. 創建如下表,並創建相關約束. # 創建班級表 create table class ( cid int primary key not null auto_increment, caption varchar(20) not null ); # 創建老師表 create tabl ...
  • 一.表約束 1.非空約束:not null 作用:定義表的某一列不能為空。 2.唯一約束:uniqe key 作用:確定某一列不能存在相同的值。 3.主鍵約束:primay key 作用:列中唯一標識一條數據,每張表裡面只能有一個主鍵;主要是幫助MYSQL更快的查找某一條信息。 特征:非空並唯一,當 ...
  • [20190321]smem的顯示缺陷.txt1.smem 加入-m參數顯示存在缺陷,map的信息不全:# smem -tk -m -U oracle -P "oraclepeis|ora_"Map PIDs AVGPSS PSS/u01/app/oracle/product/10.2.0/db_1 ...
  • (註:第一次寫,如有錯誤之處,希望指出,不勝感激,謝謝,不喜也勿噴) 一.MYSQL簡單描述 1.MYSQL是什麼? MYSQL是現在最流行的關係型資料庫管理系統之一; MYSQL是開源軟體; MYSQL是現在最流行的關係型資料庫管理系統之一; MYSQL是開源軟體; 關係型資料庫? 註:所謂關係型 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...