## 實踐環境 python 3.6.2 Joblib ## 簡介 Joblib是一組在Python中提供輕量級流水線的工具。特別是: 1. 函數的透明磁碟緩存和延遲重新計算(記憶模式) 2. 簡單易用的並行計算 Joblib已被優化得很快速,很健壯了,特別是在大數據上,並對numpy數組進行了特定 ...
實踐環境
python 3.6.2
Joblib
簡介
Joblib是一組在Python中提供輕量級流水線的工具。特別是:
- 函數的透明磁碟緩存和延遲重新計算(記憶模式)
- 簡單易用的並行計算
Joblib已被優化得很快速,很健壯了,特別是在大數據上,並對numpy數組進行了特定的優化。
主要功能
-
輸出值的透明快速磁碟緩存(Transparent and fast disk-caching of output value): Python函數的記憶體化或類似make的功能,適用於任意Python對象,包括非常大的numpy數組。通過將操作寫成一組具有定義良好的輸入和輸出的步驟:Python函數,將持久性和流執行邏輯與域邏輯或演算法代碼分離開來。Joblib可以將其計算保存到磁碟上,並僅在必要時重新運行:
原文:
Transparent and fast disk-caching of output value: a memoize or make-like functionality for Python functions that works well for arbitrary Python objects, including very large numpy arrays. Separate persistence and flow-execution logic from domain logic or algorithmic code by writing the operations as a set of steps with well-defined inputs and outputs: Python functions. Joblib can save their computation to disk and rerun it only if necessary:
>>> from joblib import Memory >>> cachedir = 'your_cache_dir_goes_here' >>> mem = Memory(cachedir) >>> import numpy as np >>> a = np.vander(np.arange(3)).astype(float) >>> square = mem.cache(np.square) >>> b = square(a) ______________________________________________________________________... [Memory] Calling square... square(array([[0., 0., 1.], [1., 1., 1.], [4., 2., 1.]])) _________________________________________________...square - ...s, 0.0min >>> c = square(a) # The above call did not trigger an evaluation
-
並行助手(parallel helper):輕鬆編寫可讀的並行代碼並快速調試
>>> from joblib import Parallel, delayed >>> from math import sqrt >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) >>> res [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
-
快速壓縮的持久化(Fast compressed Persistence):代替pickle在包含大數據的Python對象上高效工作(
joblib.dump
&joblib.load
)。
parallel for loops
常見用法
Joblib提供了一個簡單的助手類,用於使用多進程為迴圈實現並行。核心思想是將要執行的代碼編寫為生成器表達式,並將其轉換為並行計算
>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
使用以下代碼,可以分佈到2個CPU上:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
輸出可以是一個生成器,在可以獲取結果時立即返回結果,即使後續任務尚未完成。輸出的順序始終與輸入的順序相匹配:輸出的順序總是匹配輸入的順序:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往後版本才支持return_generator參數
>>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
>>> print(type(output_generator))
<class 'generator'>
>>> print(next(output_generator))
0.0
>>> print(next(output_generator))
1.0
>>> print(list(output_generator))
[2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
此生成器允許減少joblib.Parallel的記憶體占用調用
基於線程的並行VS基於進程的並行
預設情況下,joblib.Parallel
使用'loky'
後端模塊啟動單獨的Python工作進程,以便在分散的CPU上同時執行任務。對於一般的Python程式來說,這是一個合理的預設值,但由於輸入和輸出數據需要在隊列中序列化以便同工作進程進行通信,因此可能會導致大量開銷(請參閱序列化和進程)。
當你知道你調用的函數是基於一個已編譯的擴展,並且該擴展在大部分計算過程中釋放了Python全局解釋器鎖(GIL)時,使用線程而不是Python進程作為併發工作者會更有效。例如,在Cython函數的with nogil 塊中編寫CPU密集型代碼。
如果希望代碼有效地使用線程,只需傳遞preferre='threads'
作為joblib.Parallel
構造函數的參數即可。在這種情況下,joblib將自動使用"threading"
後端,而不是預設的"loky"
後端
>>> Parallel(n_jobs=2, prefer=threads')(
... delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
也可以在上下文管理器的幫助下手動選擇特定的後端實現:
>>> from joblib import parallel_backend
>>> with parallel_backend('threading', n_jobs=2):
... Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
...
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
後者在調用內部使用joblib.Parallel
的庫時特別有用,不會將後端部分作為其公共API的一部分公開。
'loky'
後端可能並不總是可獲取。
一些罕見的系統不支持多處理(例如Pyodide)。在這種情況下,loky後端不可用,使用線程作為預設後端。
除了內置的joblib後端之外,還可以使用幾個特定於集群的後端:
- 用於Dask集群的Dask後端 (查閱Using Dask for single-machine parallel computing 以獲取示例),
- 用於Ray集群的Ray後端
- 用於Spark集群上分發joblib任務的Joblib Apache Spark Backend
序列化與進程
要在多個python進程之間共用函數定義,必須依賴序列化協議。python中的標準協議是pickle
,但它在標準庫中的預設實現有幾個限制。例如,它不能序列化互動式定義的函數或在__main__
模塊中定義的函數。
為了避免這種限制,loky
後端現在依賴於cloudpickle
以序列化python對象。cloudpickle
是pickle
協議的另一種實現方式,允許序列化更多的對象,特別是互動式定義的函數。因此,對於大多數用途,loky
後端應該可以完美的工作。
cloudpickle
的主要缺點就是它可能比標準類庫中的pickle
慢,特別是,對於大型python字典或列表來說,這一點至關重要,因為它們的序列化時間可能慢100倍。有兩種方法可以更改 joblib
的序列化過程以緩和此問題:
-
如果您在UNIX系統上,則可以切換回舊的
multiprocessing
後端。有了這個後端,可以使用很快速的pickle
在工作進程中共用互動式定義的函數。該解決方案的主要問題是,使用fork
啟動進程會破壞標準POSIX,並可能與numpy
和openblas
等第三方庫進行非正常交互。 -
如果希望將
loky
後端與不同的序列化庫一起使用,則可以設置LOKY_PICKLER=mod_pickle
環境變數,以使用mod_pickle
作為loky
的序列化庫。作為參數傳遞的模塊mod_pickle
應按import mod_picke
導入,並且應包含一個Pickler
對象,該對象將用於序列化為對象。可以設置LOKY_PICKLER=pickle
以使用表中類庫中的pickling模塊。LOKY_PICKLER=pickle
的主要缺點是不能序列化互動式定義的函數。為瞭解決該問題,可以將此解決方案與joblib.wrap_non_picklable_objects()
一起使用,joblib.wrap_non_picklable_objects()
可用作裝飾器以為特定對下本地啟用cloudpickle
。通過這種方式,可以為所有python對象使用速度快的picking,併在本地為互動式函數啟用慢速的pickling。查閱loky_wrapper獲取示例。
共用記憶體語義
joblib的預設後端將在獨立的Python進程中運行每個函數調用,因此它們不能更改主程式中定義的公共Python對象。
然而,如果並行函數確實需要依賴於線程的共用記憶體語義,則應顯示的使用require='sharemem'
,例如:
>>> shared_set = set()
>>> def collect(x):
... shared_set.add(x)
...
>>> Parallel(n_jobs=2, require='sharedmem')(
... delayed(collect)(i) for i in range(5))
[None, None, None, None, None]
>>> sorted(shared_set)
[0, 1, 2, 3, 4]
請記住,從性能的角度來看,依賴共用記憶體語義可能是次優的,因為對共用Python對象的併發訪問將受到鎖爭用的影響。
註意,不使用共用記憶體的情況下,任務進程之間的記憶體資源是相互獨立的,舉例說明如下:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from joblib import Parallel, delayed, parallel_backend
from collections import deque
GLOBAL_LIST = []
class TestClass():
def __init__(self):
self.job_queue = deque()
def add_jobs(self):
i = 0
while i < 3:
time.sleep(1)
i += 1
GLOBAL_LIST.append(i)
self.job_queue.append(i)
print('obj_id:', id(self), 'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST)
def get_job_queue_list(obj):
i = 0
while not obj.job_queue and i < 3:
time.sleep(1)
i += 1
print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST)
return obj.job_queue
if __name__ == "__main__":
obj = TestClass()
def test_fun():
with parallel_backend("multiprocessing", n_jobs=2):
Parallel()(delayed(get_job_queue_list)(obj) for i in range(2))
thread = threading.Thread(target=test_fun, name="parse_log")
thread.start()
time.sleep(1)
obj.add_jobs()
print('global_list_len:', len(GLOBAL_LIST))
控制台輸出:
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
global_list_len: 3
通過輸出可知,通過joblib.Parallel開啟的進程,其占用記憶體和主線程占用的記憶體資源是相互獨立
復用worer池
一些演算法需要對並行函數進行多次連續調用,同時對中間結果進行處理。在一個迴圈中多次調用joblib.Parallel
次優的,因為它會多次創建和銷毀一個workde(線程或進程)池,這可能會導致大量開銷。
在這種情況下,使用joblib.Parallel
類的上下文管理器API更有效,以便對joblib.Parallel
對象的多次調用可以復用同一worker池。
from joblib import Parallel, delayed
from math import sqrt
with Parallel(n_jobs=2) as parallel:
accumulator = 0.
n_iter = 0
while accumulator < 1000:
results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
accumulator += sum(results) # synchronization barrier
n_iter += 1
print(accumulator, n_iter) #輸出: 1136.5969161564717 14
請註意,現在基於進程的並行預設使用'loky'
後端,該後端會自動嘗試自己維護和重用worker池,即使是在沒有上下文管理器的調用中也是如此
筆者實踐發現,即便採用這種實現方式,其運行效率也是非常低下的,應該儘量避免這種設計(實踐環境 Python3.6)
...略
Parallel參考文檔
class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))
常用參數說明
-
n_jobs
:int, 預設:None
併發運行作業的最大數量,例如當
backend='multiprocessing'
時Python工作進程的數量,或者當backend='threading'
時線程池的大小。如果設置為 -1,則使用所有CPU。如果設置為1,則根本不使用並行計算代碼,並且行為相當於一個簡單的python for迴圈。此模式與timeout
不相容。如果n_jobs
小於-1,則使用(n_cpus+1+n_jobs)
。因此,如果n_jobs=-2
,將使用除一個CPU之外的所有CPU。如果為None
,則預設n_jobs=1
,除非在parallel_backend()
上下文管理器下執行調用,此時會為n_jobs
設置另一個值。 -
backend
: str,ParallelBackendBase
實例或者None
, 預設:'loky'
指定並行化後端實現。支持的後端有:
-
loky
在與工作Python進程交換輸入和輸出數據時,預設使用的loky
可能會導致一些通信和記憶體開銷。在一些罕見的系統(如Pyiode)上,loky
後端可能不可用。 -
multiprocessing
以前基於進程的後端,基於multiprocessing.Pool
。不如loky健壯。 -
threading
是一個開銷很低的後端,但如果被調用的函數大量依賴於Python對象,它會受到Python GIL的影響。當執行瓶頸是顯式釋放GIL的已編譯擴展時,threading
最有用(例如,with-nogil
塊中封裝的Cython迴圈或對NumPy等庫的昂貴調用)。 -
最後,可以通過調用
register_pallel_backend()
來註冊後端。
不建議在類庫中調用
Parallel
時對backend
名稱進行硬編碼,取而代之,建議設置軟提示(prefer
)或硬約束(require
),以便庫用戶可以使用parallel_backend()
上下文管理器從外部更改backend
。 -
-
return_generator
: bool如果為
True
,則對此實例的調用將返回一個生成器,併在結果可獲取時立即按原始順序返回結果。請註意,預期用途是一次運行一個調用。對同一個Parallel對象的多次調用將導致RuntimeError
-
prefer
: str 可選值‘processes’
,‘threads’
,None
, 預設:None
如果使用
parallel_backen()
上下文管理器時沒有指定特定後端,則選擇預設prefer
給定值。預設的基於進程的後端是loky
,而預設的基於線程的後端則是threading
。如果指定了backend
參數,則忽略該參數。 -
require
:‘sharedmem’
或者None
, 預設None
用於選擇後端的硬約束。如果設置為
'sharedmem'
,則所選後端將是單主機和基於線程的,即使用戶要求使用具有parallel_backend
的非基於線程的後端。
參考文檔
https://joblib.readthedocs.io/en/latest/
https://joblib.readthedocs.io/
https://joblib.readthedocs.io/en/latest/parallel.html#common-usage
作者:授客
微信/QQ:1033553122
全國軟體測試QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限於時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞後如有任何疑問,請聯繫我!!!
微信打賞
支付寶打賞 全國軟體測試交流QQ群