測試是軟體開發中的基礎,它經常被數據開發者忽視,但是它很重要。在本文中會展示如何使用Python的uniittest.mock庫,對一段PySpark代碼進行測試。筆者會從數據科學家的視角來進行工作,這意味著本文將不會深入某些軟體開發的細節。 本文鏈接:https://www.cnblogs.com ...
測試是軟體開發中的基礎,它經常被數據開發者忽視,但是它很重要。在本文中會展示如何使用Python的uniittest.mock庫,對一段PySpark代碼進行測試。筆者會從數據科學家的視角來進行工作,這意味著本文將不會深入某些軟體開發的細節。
本文鏈接:https://www.cnblogs.com/hhelibeb/p/10508692.html
英文原文:Stop mocking me! Unit tests in PySpark using Python’s mock library
單元測試和mock是什麼?
單元測試是一種測試代碼片段的方式,確保代碼片段按預期工作。Python中的uniittest.mock庫,允許人們將部分代碼替換為mock對象,並對人們使用這些mock對象的方式進行斷言。“mock”的功能如名字所示——它模仿代碼中的對象/變數的屬性。
最終目標:測試spark.sql(query)
PySpark中最簡單的創建dataframe的方式如下:
df = spark.sql("SELECT * FROM table")
雖然它很簡單,但依然應該被測試。
準備代碼和問題
假設我們為一家電子商務服裝公司服務,我們的目標是創建產品相似度表,用某些條件過濾數據,把它們寫入到HDFS中。
假設我們有如下的表:
1. Products. Columns: “item_id”, “category_id”.
2. Product_similarity (unfiltered). Columns: “item_id_1”, “item_id_2”, “similarity_score”.
(假設Product_similarity中的相似度分數在0~1之間,越接近1,就越相似。)
查看一對產品和他們的分數是很簡單的:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s WHERE s.item_id_1 != s.item_id_2
where子句將和自身對比的項目移除。否則的話會得到分數為1的結果,沒有意義!
要是我們想要創建一個展示相同目錄下的產品的相似度的表呢?要是我們不關心鞋子和圍巾的相似度,但是想要比較不同的鞋子與鞋子、圍巾與圍巾呢?這會有點複雜,需要我們連接“product”和“product_similarity”兩個表。
查詢語句變為:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s INNER JOIN products p ON s.item_id_1 = p.item_id INNER JOIN products q ON s.item_id_2 = q.item_id WHERE s.item_id_1 != s.item_id_2 AND p.category_id = q.category_i
我們也可能想得知與每個產品最相似的N個其它項目,在該情況下,查詢語句為:
SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM ( SELECT s.item_id_1, s.item_id_2, s.similarity_score, ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num FROM product_similarity s INNER JOIN products p ON s.item_id_1 = p.item_id INNER JOIN products q ON s.item_id_2 = q.item_id WHERE s.item_id_1 != s.item_id_2 AND p.category_id = q.category_id ) WHERE row_num <= 10
(假設N=10)
現在,要是我們希望跨產品目錄比較和在產品目錄內比較兩種功能成為一個可選項呢?我們可以通過使用名為same_category的布爾變數,它會控制一個字元串變數same_category_q的值,並將其傳入查詢語句(通過.format())。如果same_category為True,則same_category_q中為inner join的內容,反之,則為空。查詢語句如下:
''' SELECT s.item_id_1, s.item_id_2, s.similarity_score FROM product_similarity s {same_category_q} '''.format(same_category_q='') # Depends on value of same_category boolean
(譯註:Python 3.6以上可以使用f-Strings代替format)
讓我們把它寫得更清楚點,用function包裝一下,
def make_query(same_category, table_paths): if same_category is True: same_category_q = ''' INNER JOIN {product_table} p ON s.item_id_1 = p.item_id INNER JOIN {product_table} q ON s.item_id_2 = q.item_id WHERE item_id_1 != item_id_2 AND p.category_id = q.category_id '''.format(product_table=table_paths["products"]["table"]) else: same_category_q = '' return same_category_q
到目前為止,很不錯。我們輸出了same_category_q,因此可以通過測試來確保它確實返回了所需的值。
回憶我們的目標,我們需要將dataframe寫入HDFS,我們可以通過如下方法來測試函數:
def create_new_table(spark, table_paths, params, same_category_q): similarity_table = table_paths["product_similarity"]["table"] created_table = spark.sql(create_table_query.format(similarity_table=similarity_table, same_category_q=same_category_q, num_items=params["num_items"])) # Write table to some path created_table.coalesce(1).write.save(table_paths["created_table"]["path"], format="orc", mode="Overwrite")
添加查詢的第一部分和一個主方法,完成我們的腳本,得到:
import pyspark from pyspark.sql import SparkSession create_table_query = ''' SELECT item_id_1, item_id_2 FROM ( SELECT item_id_1, item_id_2, ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num FROM {similarity_table} s {same_category_q} ) WHERE row_num <= {num_items} ''' def create_new_table(spark, table_paths, params, from_date, to_date, same_category_q): similarity_table = table_paths["product_similarity"]["table"] created_table = spark.sql(create_table_query.format(similarity_table=similarity_table, same_category_q=same_category_q, num_items=params["num_items"])) # Write table to some path created_table.coalesce(1).write.save(table_paths["created_table"]["path"], format="orc", mode="Overwrite") def make_query(same_category, table_paths): if same_category is True: same_category_q = ''' INNER JOIN {product_table} p ON s.item_id_1 = p.item_id INNER JOIN {product_table} q ON s.item_id_2 = q.item_id WHERE item_id_1 != item_id_2 AND p.category_id = q.category_id '''.format(product_table=table_paths["product_table"]["table"]) else: same_category_q = '' return same_category_q if __name__ == "__main__": spark = (SparkSession .builder .appName("testing_tutorial") .enableHiveSupport() .getOrCreate()) same_category = True # or False table_paths = foo # Assume paths are in some JSON params = bar same_category_q, target_join_q = make_query(same_category, table_paths)
create_new_table(spark, table_paths, params, same_category_q)
這裡的想法是,我們需要創建為腳本中的每個函數創建function,名字一般是test_name_of_function()。需要通過斷言來驗證function的行為符合預期。
測試查詢-make_query
首先,測試make_query。make_query有兩個輸入參數:一個布爾變數和某些表路徑。它會基於布爾變數same_category返回不同的same_category_q。我們做的事情有點像是一個if-then語句集:
1. If same_category is True, then same_category_q = “INNER JOIN …”
2. If same_category is False, then same_category_q = “” (empty)
我們要做的是模擬make_query的參數,把它們傳遞給function,接下來測試是否得到期望的輸出。因為test_paths是個目錄,我們無需模擬它。測試腳本如下,說明見註釋:
def test_make_query_true(mocker): # Create some fake table paths test_paths = { "product_table": { "table": "products", }, "similarity_table": { "table": "product_similarity" } } # Call the function with our paths and "True" same_category_q = make_query(True, test_paths) # We want same_category_q to be non-empty assert same_category_q != '' def test_make_query_false(mocker): # As above, create some fake paths test_paths = { "product_table": { "table": "products", }, "similarity_table": { "table": "product_similarity" } } same_category_q = make_query(False, test_paths) # This time, we want same_category_q to be empty assert same_category_q == ''
就是這麼簡單!
測試表創建
下一步,我們需要測試create_new_table的行為。逐步觀察function,我們可以看到它做了幾件事,有幾個地方可以進行斷言和模擬。註意,無論何時,只要程式中有某些類似df.write.save.something.anotherthing的內容,我們就需要模擬每個操作和它們的輸出。
- 這個function使用spark作為參數,這需要被模擬。
- 通過調用spark.sql(create_table_query.format(**some_args))來創建created_table。我們需要斷言spark.sql()只被調用了一次。我們也需要模擬spark.sql()的輸出。
- Coalesce created_table。保證調用coalesce()時的參數是1。模擬輸出。
- 寫coalesced table,我們需要模擬.write,模擬調用它的輸出。
- 將coalesced table保存到一個路徑。確保它的調用帶有正確的參數。
和前面一樣,測試腳本如下:
ef test_create_new_table(mocker): # Mock all our variables mock_spark = mock.Mock() mock_category_q = mock.Mock() mock_created_table = mock.Mock() mock_created_table_coalesced = mock.Mock() # Calling spark.sql with create_table_query returns created_table - we need to mock it mock_spark.sql.side_effect = [mock_created_table] # Mock the output of calling .coalesce on created_table mock_created_table.coalesce.return_value = mock_created_table_coalesced # Mock the .write as well mock_write = mock.Mock() # Mock the output of calling .write on the coalesced created table mock_created_table_coalesced.write = mock_write test_paths = { "product_table": { "table": "products", }, "similarity_table": { "table": "product_similarity" }, "created_table": { "path": "path_to_table", } } test_params = { "num_items": 10, } # Call our function with our mocks create_new_table(mock_spark, test_paths, test_params, mock_category_q) # We only want spark.sql to have been called once, so assert that assert 1 == mock_spark.sql.call_count # Assert that we did in fact call created_table.coalesce(1) mock_created_table.coalesce.assert_called_with(1) # Assert that the table save path was passed in properly mock_write.save.assert_called_with(test_paths["created_table"]["path"], format="orc", mode="Overwrite")
最後,把每樣東西保存在一個文件夾中,如果你想的話,你需要從相應的模塊中導入function,或者把所有東西放在同一個腳本中。
為了測試它,在命令行導航到你的文件夾(cd xxx),然後執行:
python -m pytest final_test.py.
你可以看到類似下麵的輸出,
serena@Comp-205:~/workspace$ python -m pytest testing_tutorial.py
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/serena/workspace/Personal,
inifile: plugins: mock-1.10.0 collected 3 items testing_tutorial.py ...
[100%]
=========================== 3 passed in 0.01 seconds ===========================
結語
以上是全部內容。希望你覺得有所幫助。當我試圖弄明白如何mock的時候,我希望可以遇到類似這樣一篇文章。
現在就去做吧,就像Stewie所說的那樣,(don’t) stop mocking me (functions)!