一、多進程 1.1 多進程的概念 由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。藉助 ...
一、多進程
1.1 多進程的概念
由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。Python提供了非常好用的多進程包multiprocessing,只需要定義一個函數,Python會完成其他所有事情。藉助這個包,可以輕鬆完成從單進程到併發執行的轉換。multiprocessing支持子進程、通信和共用數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程式內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
但在使用這些共用API的時候,我們要註意以下幾點:
a、在UNIX平臺上,當某個進程終結之後,該進程需要被其父進程調用wait,否則進程成為僵屍進程(Zombie)。所以,有必要對每個Process對象調用join()方法 (實際上等同於wait)。對於多線程來說,由於只有一個進程,所以不存在此必要性。
b、Windows系統下,需要註意的是要想啟動一個子進程,必須加上 if __name__ == "__main__",進程相關的要寫在這句下麵。
1.2 創建進程的兩種方式
直接創建:
1 from multiprocessing import Process 2 import time 3 def f(name): 4 time.sleep(1) 5 print('hello', name,time.ctime()) 6 7 if __name__ == '__main__': 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=('alvin',)) 11 p_list.append(p) 12 p.start() 13 for i in p_list: 14 p.join() 15 print('end')Demo1
類式調用:
1 from multiprocessing import Process 2 import time 3 4 class MyProcess(Process): 5 def __init__(self): 6 super(MyProcess, self).__init__() 7 #self.name = name 8 9 def run(self): 10 time.sleep(1) 11 print ('hello', self.name,time.ctime()) 12 13 14 if __name__ == '__main__': 15 p_list=[] 16 for i in range(3): 17 p = MyProcess() 18 p.start() 19 p_list.append(p) 20 21 for p in p_list: 22 p.join() 23 24 print('end')Demo2
二、Process類
2.1 構造方法
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
2.2 實例方法
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程準備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t預設run()方法。
terminate():不管任務是否完成,立即停止工作進程
2.3 屬性
authkey
daemon:和線程的setDeamon功能一樣
exitcode(進程在運行時為None、如果為–N,表示被信號N結束)
name:進程名字。
pid:進程號。
1 import time 2 from multiprocessing import Process 3 4 def foo(i): 5 time.sleep(1) 6 print (p.is_alive(),i,p.pid) 7 time.sleep(1) 8 9 if __name__ == '__main__': 10 p_list=[] 11 for i in range(10): 12 p = Process(target=foo, args=(i,)) 13 #p.daemon=True 14 p_list.append(p) 15 16 for p in p_list: 17 p.start() 18 # for p in p_list: 19 # p.join() 20 21 print('main process end')Demo
三、進程間通信
不同進程間記憶體是不共用的,要想實現兩個進程間的數據交換,可以用以下方法:
3.1 Queues 使用方法跟threading里的queue類似 --> 將q作為參數傳遞給子進程。
1 from multiprocessing import Process, Queue 2 3 def f(q,n): 4 q.put([42, n, 'hello']) 5 6 if __name__ == '__main__': 7 q = Queue() 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=(q,i)) 11 p_list.append(p) 12 p.start() 13 print(q.get()) 14 print(q.get()) 15 print(q.get()) 16 for i in p_list: 17 i.join()Demo1
3.2 Pipes --> 通過管道Pipe實現。
1 from multiprocessing import Process, Pipe 2 3 def f(conn): 4 conn.send([42, None, 'hello']) 5 conn.close() 6 7 if __name__ == '__main__': 8 parent_conn, child_conn = Pipe() 9 p = Process(target=f, args=(child_conn,)) 10 p.start() 11 print(parent_conn.recv()) # prints "[42, None, 'hello']" 12 p.join()Demo2
3.3 數據共用(Manager)
Manager()返回的管理器對象控制一個伺服器進程,該進程保存Python對象並允許其他進程使用代理操作它們。
1 from multiprocessing import Process, Manager 2 3 def f(d, l,n): 4 d[n] = '1' 5 d['2'] = 2 6 d[0.25] = None 7 l.append(n) 8 print(l) 9 10 if __name__ == '__main__': 11 with Manager() as manager: 12 d = manager.dict() 13 14 l = manager.list(range(5)) 15 p_list = [] 16 for i in range(10): 17 p = Process(target=f, args=(d, l,i)) 18 p.start() 19 p_list.append(p) 20 for res in p_list: 21 res.join() 22 23 print(d) 24 print(l)Demo
3.4 進程同步
Without using the lock output from the different processes is liable to get all mixed up. 如果不使用來自不同進程的鎖定輸出,則可能會混淆不清。
1 from multiprocessing import Process, Lock 2 3 def f(l, i): 4 l.acquire() 5 try: 6 print('hello world', i) 7 finally: 8 l.release() 9 10 if __name__ == '__main__': 11 lock = Lock() 12 13 for num in range(10): 14 Process(target=f, args=(lock, num)).start()
3.5 進程池
進程池內部維護一個進程式列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那麼程式就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply
- apply_async
1 from multiprocessing import Process,Pool 2 import time 3 4 def Foo(i): 5 time.sleep(2) 6 return i+100 7 8 def Bar(arg): 9 print('-->exec done:',arg) 10 11 pool = Pool(5) 12 13 for i in range(10): 14 pool.apply_async(func=Foo, args=(i,),callback=Bar) 15 #pool.apply(func=Foo, args=(i,)) 16 17 print('end') 18 pool.close() 19 pool.join()
四、協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
- "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
- 方便切換控制流,簡化編程模型
- 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高併發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式
4.1 使用yield實現協程操作
1 import time
2 import queue
3 def consumer(name): 4 print("--->starting eating baozi...") 5 while True: 6 new_baozi = yield 7 print("[%s] is eating baozi %s" % (name,new_baozi)) 8 #time.sleep(1) 9 10 def producer(): 11 12 r = con.__next__() 13 r = con2.__next__() 14 n = 0 15 while n < 5: 16 n +=1 17 con.send(n) 18 con2.send(n) 19 print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) 20 21 22 if __name__ == '__main__': 23 con = consumer("c1") # 創建生成器對象 24 con2 = consumer("c2") # 創建生成器對象 25 p = producer() # 執行producer函數
協程標准定義,即符合什麼條件就能稱之為協程:
- 必須在只有一個單線程里實現併發
- 修改共用數據不需加鎖
- 用戶程式里自己保存多個控制流的上下文棧
- 一個協程遇到IO操作自動切換到其它協程
基於上面這4點定義,我們剛纔用yield實現的程並不能算是合格的線程,因為它有一點功能沒實現。
4.2 Greenlet
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator
1 # -*- coding:utf-8 -*- 2 3 4 from greenlet import greenlet 5 6 def test1(): 7 print(12) 8 gr2.switch() 9 print(34) 10 gr2.switch() 11 12 def test2(): 13 print(56) 14 gr1.switch() 15 print(78) 16 17 gr1 = greenlet(test1) 18 gr2 = greenlet(test2) 19 gr1.switch()
感覺確實用著比generator還簡單了,但好像還沒有解決一個問題,就是遇到IO操作,自動切換,並不對。
4.3 Gevent
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程式操作系統進程的內部,但它們被協作式地調度。
1 import gevent 2 3 def func1(): 4 print('\033[31;1m李闖在跟海濤搞...\033[0m') 5 gevent.sleep(2) 6 print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m') 7 8 def func2(): 9 print('\033[32;1m李闖切換到了跟海龍搞...\033[0m') 10 gevent.sleep(1) 11 print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m') 12 13 14 gevent.joinall([ 15 gevent.spawn(func1), 16 gevent.spawn(func2), 17 #gevent.spawn(func3), 18 ])
輸出:
李闖在跟海濤搞...
李闖切換到了跟海龍搞...
李闖搞完了海濤,回來繼續跟海龍搞...
李闖又回去跟繼續跟海濤搞...
五、補充
5.1 同步與非同步的性能區別
1 import gevent 2 3 def task(pid): 4 """ 5 Some non-deterministic task 6 """ 7 gevent.sleep(0.5) 8 print('Task %s done' % pid) 9 10 def synchronous(): 11 for i in range(1,10): 12 task(i) 13 14 def asynchronous(): 15 threads = [gevent.spawn(task, i) for i in range(10)] 16 gevent.joinall(threads) 17 18 print('Synchronous:') 19 synchronous() 20 21 print('Asynchronous:') 22 asynchronous()
上面程式的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。
5.2 遇到IO阻塞時會自動切換任務(gevent 庫中的 monkey 方法)--> 補丁
------->能夠最大程度監聽IO阻塞,提高效率。
1 from gevent import monkey 2 monkey.patch_all() 3 4 import gevent 5 from urllib.request import urlopen 6 7 def f(url): 8 print('GET: %s' % url) 9 resp = urlopen(url) 10 data = resp.read() 11 print('%d bytes received from %s.' % (len(data), url)) 12 13 gevent.joinall([ 14 gevent.spawn(f, 'https://www.python.org/'), 15 gevent.spawn(f, 'https://www.yahoo.com/'), 16 gevent.spawn(f, 'https://github.com/'), 17 ])