使用Python的Mock庫進行PySpark單元測試

来源:https://www.cnblogs.com/hhelibeb/archive/2019/03/11/10508692.html
-Advertisement-
Play Games

測試是軟體開發中的基礎,它經常被數據開發者忽視,但是它很重要。在本文中會展示如何使用Python的uniittest.mock庫,對一段PySpark代碼進行測試。筆者會從數據科學家的視角來進行工作,這意味著本文將不會深入某些軟體開發的細節。 本文鏈接:https://www.cnblogs.com ...


測試是軟體開發中的基礎,它經常被數據開發者忽視,但是它很重要。在本文中會展示如何使用Python的uniittest.mock庫,對一段PySpark代碼進行測試。筆者會從數據科學家的視角來進行工作,這意味著本文將不會深入某些軟體開發的細節。

本文鏈接:https://www.cnblogs.com/hhelibeb/p/10508692.html

英文原文:Stop mocking me! Unit tests in PySpark using Python’s mock library

單元測試和mock是什麼?

單元測試是一種測試代碼片段的方式,確保代碼片段按預期工作。Python中的uniittest.mock庫,允許人們將部分代碼替換為mock對象,並對人們使用這些mock對象的方式進行斷言。“mock”的功能如名字所示——它模仿代碼中的對象/變數的屬性。

最終目標:測試spark.sql(query)

PySpark中最簡單的創建dataframe的方式如下:

df = spark.sql("SELECT * FROM table")

雖然它很簡單,但依然應該被測試。

準備代碼和問題

假設我們為一家電子商務服裝公司服務,我們的目標是創建產品相似度表,用某些條件過濾數據,把它們寫入到HDFS中。

假設我們有如下的表:

1. Products. Columns: “item_id”, “category_id”.
2. Product_similarity (unfiltered). Columns: “item_id_1”, “item_id_2”, “similarity_score”.

(假設Product_similarity中的相似度分數在0~1之間,越接近1,就越相似。)

查看一對產品和他們的分數是很簡單的:

SELECT
s.item_id_1,
s.item_id_2,
s.similarity_score
FROM product_similarity s
WHERE s.item_id_1 != s.item_id_2

where子句將和自身對比的項目移除。否則的話會得到分數為1的結果,沒有意義!

要是我們想要創建一個展示相同目錄下的產品的相似度的表呢?要是我們不關心鞋子和圍巾的相似度,但是想要比較不同的鞋子與鞋子、圍巾與圍巾呢?這會有點複雜,需要我們連接“product”和“product_similarity”兩個表。

查詢語句變為:

SELECT
  s.item_id_1,
  s.item_id_2,
  s.similarity_score
FROM product_similarity s
INNER JOIN products p
ON s.item_id_1 = p.item_id
INNER JOIN products q
ON s.item_id_2 = q.item_id
WHERE s.item_id_1 != s.item_id_2
AND p.category_id = q.category_i

我們也可能想得知與每個產品最相似的N個其它項目,在該情況下,查詢語句為:

SELECT
    s.item_id_1,
    s.item_id_2,
    s.similarity_score
FROM (
    SELECT
        s.item_id_1,
        s.item_id_2,
        s.similarity_score,
        ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
    FROM product_similarity s
    INNER JOIN products p
    ON s.item_id_1 = p.item_id
    INNER JOIN products q
    ON s.item_id_2 = q.item_id
    WHERE s.item_id_1 != s.item_id_2
    AND p.category_id = q.category_id
)
WHERE row_num <= 10

(假設N=10)

現在,要是我們希望跨產品目錄比較和在產品目錄內比較兩種功能成為一個可選項呢?我們可以通過使用名為same_category的布爾變數,它會控制一個字元串變數same_category_q的值,並將其傳入查詢語句(通過.format())。如果same_category為True,則same_category_q中為inner join的內容,反之,則為空。查詢語句如下:

'''
SELECT
  s.item_id_1,
  s.item_id_2,
  s.similarity_score
FROM product_similarity s
{same_category_q}
'''.format(same_category_q='') # Depends on value of same_category boolean

(譯註:Python 3.6以上可以使用f-Strings代替format)

讓我們把它寫得更清楚點,用function包裝一下,

def make_query(same_category, table_paths): 

    if same_category is True:
        same_category_q = '''
INNER JOIN {product_table} p
ON s.item_id_1 = p.item_id
INNER JOIN {product_table} q
ON s.item_id_2 = q.item_id
WHERE item_id_1 != item_id_2
AND p.category_id = q.category_id
'''.format(product_table=table_paths["products"]["table"])
    else:
        same_category_q = ''

return same_category_q

到目前為止,很不錯。我們輸出了same_category_q,因此可以通過測試來確保它確實返回了所需的值。

回憶我們的目標,我們需要將dataframe寫入HDFS,我們可以通過如下方法來測試函數:

def create_new_table(spark, table_paths, params, same_category_q):

    similarity_table = table_paths["product_similarity"]["table"]

    created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
                                                        same_category_q=same_category_q,
                                                        num_items=params["num_items"]))

    # Write table to some path
    created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
format="orc", mode="Overwrite")

添加查詢的第一部分和一個主方法,完成我們的腳本,得到:

import pyspark
from pyspark.sql import SparkSession

create_table_query = '''
SELECT
    item_id_1,
    item_id_2
FROM (
    SELECT
        item_id_1,
        item_id_2,
        ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
    FROM {similarity_table} s
    {same_category_q}
    )
WHERE row_num <= {num_items}
'''

def create_new_table(spark, table_paths, params, from_date, to_date, same_category_q):

    similarity_table = table_paths["product_similarity"]["table"]

    created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
                                                        same_category_q=same_category_q,
                                                        num_items=params["num_items"]))

    # Write table to some path
    created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
                                          format="orc", mode="Overwrite")


def make_query(same_category, table_paths): 

    if same_category is True:
        same_category_q = '''
INNER JOIN {product_table} p
ON s.item_id_1 = p.item_id
INNER JOIN {product_table} q
ON s.item_id_2 = q.item_id
WHERE item_id_1 != item_id_2
AND p.category_id = q.category_id
'''.format(product_table=table_paths["product_table"]["table"])
    else:
        same_category_q = ''

    return same_category_q
  
if __name__ == "__main__":
   
    spark = (SparkSession
             .builder
             .appName("testing_tutorial")
             .enableHiveSupport()
             .getOrCreate())

    same_category = True # or False
    table_paths = foo # Assume paths are in some JSON 
    params = bar
    
    same_category_q, target_join_q = make_query(same_category, table_paths)
create_new_table(spark, table_paths, params, same_category_q)

這裡的想法是,我們需要創建為腳本中的每個函數創建function,名字一般是test_name_of_function()。需要通過斷言來驗證function的行為符合預期。

測試查詢-make_query

首先,測試make_query。make_query有兩個輸入參數:一個布爾變數和某些表路徑。它會基於布爾變數same_category返回不同的same_category_q。我們做的事情有點像是一個if-then語句集:

1. If same_category is True, then same_category_q = “INNER JOIN …”
2. If same_category is False, then same_category_q = “” (empty)

我們要做的是模擬make_query的參數,把它們傳遞給function,接下來測試是否得到期望的輸出。因為test_paths是個目錄,我們無需模擬它。測試腳本如下,說明見註釋:

def test_make_query_true(mocker):

    # Create some fake table paths
    test_paths = {
        "product_table": {
            "table": "products",
        },
        "similarity_table": {
            "table": "product_similarity"
        }
    }

    # Call the function with our paths and "True"
    same_category_q = make_query(True, test_paths)
    # We want same_category_q to be non-empty
    assert same_category_q != ''

def test_make_query_false(mocker):

    # As above, create some fake paths
    test_paths = {
        "product_table": {
            "table": "products",
        },
        "similarity_table": {
            "table": "product_similarity"
        }
    }

    same_category_q = make_query(False, test_paths)
    # This time, we want same_category_q to be empty
assert same_category_q == ''

就是這麼簡單!

測試表創建

下一步,我們需要測試create_new_table的行為。逐步觀察function,我們可以看到它做了幾件事,有幾個地方可以進行斷言和模擬。註意,無論何時,只要程式中有某些類似df.write.save.something.anotherthing的內容,我們就需要模擬每個操作和它們的輸出。

  1. 這個function使用spark作為參數,這需要被模擬。
  2. 通過調用spark.sql(create_table_query.format(**some_args))來創建created_table。我們需要斷言spark.sql()只被調用了一次。我們也需要模擬spark.sql()的輸出。
  3. Coalesce created_table。保證調用coalesce()時的參數是1。模擬輸出。
  4. 寫coalesced table,我們需要模擬.write,模擬調用它的輸出。
  5. 將coalesced table保存到一個路徑。確保它的調用帶有正確的參數。

和前面一樣,測試腳本如下:

ef test_create_new_table(mocker):

    # Mock all our variables
    mock_spark = mock.Mock()
    mock_category_q = mock.Mock()
    mock_created_table = mock.Mock()
    mock_created_table_coalesced = mock.Mock()
    # Calling spark.sql with create_table_query returns created_table - we need to mock it
    mock_spark.sql.side_effect = [mock_created_table]
    # Mock the output of calling .coalesce on created_table
    mock_created_table.coalesce.return_value = mock_created_table_coalesced
    # Mock the .write as well
    mock_write = mock.Mock()
    # Mock the output of calling .write on the coalesced created table
    mock_created_table_coalesced.write = mock_write

    test_paths = {
        "product_table": {
            "table": "products",
        },
        "similarity_table": {
            "table": "product_similarity"
        },
        "created_table": {
          "path": "path_to_table",
        }
    }
    test_params = {
        "num_items": 10,
    }

    # Call our function with our mocks
    create_new_table(mock_spark, test_paths, test_params, mock_category_q)
    # We only want spark.sql to have been called once, so assert that
    assert 1 == mock_spark.sql.call_count
    # Assert that we did in fact call created_table.coalesce(1)
    mock_created_table.coalesce.assert_called_with(1)
    # Assert that the table save path was passed in properly
    mock_write.save.assert_called_with(test_paths["created_table"]["path"],
format="orc", mode="Overwrite")

最後,把每樣東西保存在一個文件夾中,如果你想的話,你需要從相應的模塊中導入function,或者把所有東西放在同一個腳本中。

為了測試它,在命令行導航到你的文件夾(cd xxx),然後執行:

python -m pytest final_test.py.

你可以看到類似下麵的輸出,

serena@Comp-205:~/workspace$ python -m pytest testing_tutorial.py
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/serena/workspace/Personal,
inifile: plugins: mock-1.10.0 collected 3 items testing_tutorial.py ...
[100%]
=========================== 3 passed in 0.01 seconds ===========================

結語

以上是全部內容。希望你覺得有所幫助。當我試圖弄明白如何mock的時候,我希望可以遇到類似這樣一篇文章。

現在就去做吧,就像Stewie所說的那樣,(don’t) stop mocking me (functions)!

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 條件查詢 使用Where進行數據篩選結果為True的會出現在結果集裡面 select 欄位 from 表名 where 條件; # 例: select * from test_table where id > 2; # 篩選出id大於2的所有欄位 比較運算符 等於= 大於> 大於等於>= 小於< 小 ...
  • connect by 用於存在父子,祖孫,上下級等層級關係的數據表進行層級查詢。 語法格式: { CONNECT BY [ NOCYCLE ] condition [AND condition]... [ START WITH condition ] | START WITH condition C ...
  • SQL優化是老生常談的話題。隨著關係型資料庫的發展,資料庫內部現在可以進行一些優化。在查詢分析,查詢檢查,資料庫內部會代數優化和物理優化之後再執行。但是,這需要我們理解資料庫內部規律才能進行。現在,我們需要找出RDBMS的優化規律,以寫出適合RDBMS自動優化的SQL語句。只看SQL優化總結,可以翻 ...
  • 通過本篇文章我們來學習一下CASE表達式的基本使用方法。 CASE表達式有簡單 CASE表達式(simple case expression)和搜索 CASE表達式(searched caseexpression)兩種寫法,它們分別如下所示。 CASE 表達式的寫法 我們在編寫 SQL 語句的時候需 ...
  • 此篇介紹下psql下dblink的使用方式,幫助自己記錄以備後需。dblink是psql下的擴展功能,可以實現在一個資料庫中遠程操作另外一個資料庫,是實現跨庫的一種方法。下麵步入正文。 安裝dblink 安裝方式自行百度(psql資料庫預設是安裝了的,可先在伺服器上查看),安裝完後$PGHOME下的 ...
  • 我只是搬運工。。。 1.我也下載了,好像不能超過500M每次,100個站。下了也不會看。有沒有高手能介紹下專門下載某個省的所有氣象站氣溫資料的方法,從而計算出每個站每月的平均氣溫。。格式為txt。2 http://www.esrl.noaa.gov/psd/data/gridded/reanalys ...
  • 本文由雲+社區發表 作者:漆洪凱 規則1 :一般情況可以選擇MyISAM存儲引擎,如果需要事務支持必須使用InnoDB存儲引擎。 註意:MyISAM存儲引擎 B tree索引有一個很大的限制:參與一個索引的所有欄位的長度之和不能超過1000位元組。另外MyISAM數據和索引是分開,而InnoDB的數據 ...
  • 存儲過程如同一門程式設計語言,同樣包含了數據類型、流程式控制制、輸入和輸出和它自己的函數庫。 一、基本數據類型:略 二、變數: 自定義變數:DECLARE a INT ; SET a=100; 可用以下語句代替:DECLARE a INT DEFAULT 100; 變數分為用戶變數和系統變數,系統變數又 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...