一、線程 多任務可以由多進程完成,也可以由一個進程內的多線程完成,一個進程內的所有線程,共用同一塊記憶體python中創建線程比較簡單,導入threading模塊,下麵來看一下代碼中如何創建多線程。 主線程從上到下執行,創建5個子線程,列印出'start',然後等待子線程執行完結束,如果想讓線程要一個 ...
一、線程
多任務可以由多進程完成,也可以由一個進程內的多線程完成,一個進程內的所有線程,共用同一塊記憶體python中創建線程比較簡單,導入threading模塊,下麵來看一下代碼中如何創建多線程。
def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() print('start') # 主線程等待子線程完成,子線程併發執行 >>start >>2 >>1 >>3 >>0 >>4
主線程從上到下執行,創建5個子線程,列印出'start',然後等待子線程執行完結束,如果想讓線程要一個個依次執行完,而不是併發操作,那麼就要使用join方法。下麵來看一下代碼
import threading import time def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() t.join() print('start') # 線程從上到下依次執行,最後列印出start >>0 >>1 >>2 >>3 >>4 >>start
上面的代碼不適用join的話,主線程會預設等待子線程結束,才會結束,如果不想讓主線程等待子線程的話,可以子線程啟動之前設置將其設置為後臺線程,如果是後臺線程,主線程執行過程中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均停止,前臺線程則相反,若果不加指定的話,預設為前臺線程,下麵從代碼來看一下,如何設置為後臺線程。例如下麵的例子,主線程直接列印start,執行完後就結束,而不會去等待子線程,子線程中的數據也就不會列印出來
import threading import time def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.setDaemon(True) t.start() print('start') # 主線程不等待子線程 >> start
除此之外,自己還可以為線程自定義名字,通過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此之外,Thread還有一下一些方法
- t.getName() : 獲取線程的名稱
- t.setName() : 設置線程的名稱
- t.name : 獲取或設置線程的名稱
- t.is_alive() : 判斷線程是否為激活狀態
- t.isAlive() :判斷線程是否為激活狀態
- t.isDaemon() : 判斷是否為守護線程
二、線程鎖
由於線程是共用同一份記憶體的,所以如果操作同一份數據,很容易造成衝突,這時候就可以為線程加上一個鎖了,這裡我們使用Rlock,而不使用Lock,因為Lock如果多次獲取鎖的時候會出錯,而RLock允許在同一線程中被多次acquire,但是需要用n次的release才能真正釋放所占用的瑣,一個線程獲取了鎖在釋放之前,其他線程只有等待。
import threading G = 1 lock = threading.RLock() def fun(): lock.acquire() # 獲取鎖 global G G += 2 print(G, threading.current_thread().name) lock.release() # 釋放鎖 return for i in range(10): t = threading.Thread(target=fun, name='t-{}'.format(i)) t.start() 3 t-0 5 t-1 7 t-2 9 t-3 11 t-4 13 t-5 15 t-6 17 t-7 19 t-8 21 t-9
三、線程間通信Event
Event是線程間通信最間的機制之一,主要用於主線程式控制制其他線程的執行,主要用過wait,clear,set,這三個方法來實現的的,下麵來看一個簡單的例子,
import threading import time def f1(event): print('start:') event.wait() # 阻塞在,等待 set print('end:') if __name__ == '__main__': event_obj = threading.Event() for i in range(5): t = threading.Thread(target=f1, args=(event_obj,)) t.start() event_obj.clear() # 清除標誌位 inp = input('>>>>:') if inp == 'true': event_obj.set() # 設置標誌位
四、隊列
可以簡單的理解為一種先進先出的數據結構,比如用於生產者消費者模型,或者用於寫線程池,以及前面寫select的時候,讀寫分離時候可用隊列存儲數據等等,以後用到隊列的地方很多,因此對於隊列的用法要熟練掌握。下麵首先來看一下隊列提供了哪些用法
q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,為0時,表示隊列長度無限制。 q.join() # 等到隊列為kong的時候,在執行別的操作 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列為空的時候,返回True 否則返回False (不可靠) q.full() # 當隊列滿的時候,返回True,否則返回False (不可靠) q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,參數block預設為True,表示當隊列滿時,會等待 # 為False時為非阻塞,此時如果隊列已滿,會引發queue.Full 異常。 可選參數timeout,表示會阻塞設置的時間, # 如果在阻塞時間里 隊列還是無法放入,則引發 queue.Full 異常 q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block預設為True,表示獲取值的時候,如果隊列為空,則阻塞 # 阻塞的話若此時隊列為空,則引發queue.Empty異常。 可選參數timeout,表示會阻塞設置的時間, q.get_nowait() # 等效於 get(item,block=False)
下麵用代碼來簡單的演示下,消費者生成者模型,只是簡單的演示下。
message = queue.Queue(10) def product(num): for i in range(num): message.put(i) print('將{}添加到隊列中'.format(i)) time.sleep(random.randrange(0, 1)) def consume(num): count = 0 while count<num: i = message.get() print('將{}從隊列取出'.format(i)) time.sleep(random.randrange(1, 2)) count += 1 t1 = threading.Thread(target=product, args=(10, )) t1.start() t2 = threading.Thread(target=consume, args=(10, )) t2.start()
五、進程
線程的上一級就是進程,進程可包含很多線程,進程和線程的區別是進程間的數據不共用,多進程也可以用來處理多任務,不過多進程很消耗資源,計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核心說保持一致。
在windows中不能用fork來創建多進程,因此只能導入multiprocessing,來模擬多進程,下麵首先來看一下怎麼創建進程,大家可以先猜一下下麵的結果是什麼
l = [] def f(i): l.append(i) print('hi', l) if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=f, args=(i,)) # 數據不共用,創建10份 l列表 p.start()
六、進程間數據共用
進程間的數據是不共用的,但是我如果非要數據共用了,那麼就需要用其他方式了
1、Value,Array
def f(a, b): a.value = 3.111 for i in range(len(b)): b[i] += 100 if __name__ == '__main__': num = Value('f', 3.333) # 類似C語言中的 浮點型數 l = Array('i', range(10)) # 類似C語言中的整形數組,長度為10 print(num.value) print(l[:]) p = Process(target=f, args=(num, l)) p.start() p.join() print(num.value) # 大家自己運行一下,看下兩次列印結果是否一樣 print(l[:])
2、manage
方式一,使用的都是C語言中的數據結構,如果大家對c不熟悉的話,用起來比較麻煩,方式2就可以支持python自帶的數據,下麵來看一下
from multiprocessing import Process,Manager def Foo(dic, i): dic[i] = 100 + i print(dic.values()) if __name__ == '__main__': manage = Manager() dic = manage.dict() for i in range(2): p = Process(target=Foo, args=(dic, i)) p.start() p.join()
七、進程池
實際應用中,並不是每次執行任務的時候,都去創建多進程,而是維護了一個進程池,每次執行的時候,都去進程池取一個,如果進程池裡面的進程取光了,就會阻塞在那裡,直到進程池中有可用進程為止。首先來看一下進程池提供了哪些方法
-
apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,由於這個原因,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。如果callback被指定,那麼callback可以接收一個參數然後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被立即完成,否則處理結果的線程會被阻塞。
-
close() : 等待任務完成後在停止工作進程,阻止更多的任務提交到pool,待任務完成後,工作進程會退出。
-
terminate() : 不管任務是否完成,立即停止工作進程。在對pool對象進程垃圾回收的時候,會立即調用terminate()。
-
join() : 等待工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait,否則進程會成為僵屍進程。
下麵來簡單的看一下代碼怎麼用的
from multiprocessing import Pool import time def f1(i): time.sleep(1) # print(i) return i def cb(i): print(i) if __name__ == '__main__': poo = Pool(5) for i in range(20): # poo.apply(func=f1, args=(i,)) # 串列執行,排隊執行 有join poo.apply_async(func=f1, args=(i,), callback=cb) # 併發執行 主進程不等子進程,無join print('**********') poo.close() poo.join()
八、線程池
對於前面的進程池,python自帶了一個模塊Pool供我們使用,但是對於線程池,則沒有提供,因此需要我們自己寫,自己寫的話,就需要用到隊列,下麵我們來看一下自己怎麼實現一個線程池,首先寫一個最簡單的版本。
import threading import time import queue class ThreadPool: def __init__(self, max_num=20): self.queue = queue.Queue(max_num) for i in range(max_num): self.add() def add(self): self.queue.put(threading.Thread) def get(self): return self.queue.get() def f(tp, i): time.sleep(1) print(i) tp.add() p = ThreadPool(10) for i in range(20): thread = p.get() t = thread(target=f, args=(p, i)) t.start()
上述代碼寫了一個線程池類,基本實現了線程池的功能,但是有很多缺點,沒有實現回掉函數,每次執行任務的時候,任務處理函數每次執行完都需要自動執行對象的add方法,將線程對象添加到隊列中去,而且類初始化的時候,一次性將所有的線程類都添加到隊列中去了,總之上面的線程池雖然實現簡單,但是實際上卻有很多問題,下麵來看一個真正意義上的線程池。
在寫代碼之前,我們先來看一下該怎麼設計這樣一個線程池,上面的線程池,我們的隊列中,存的是線程類,我們每處理一個任務都實例化一個線程,然後執行完了之後,該線程就被丟棄了,這樣有點不合適。我們這次設計的時候,
- 隊列中存的不是線程類,而是任務,我們從隊列中拿取的都是任務
- 每次執行任務的時候,不是都要生成一個線程,而是如果以前生成的線程有空閑的話,就用以前的線程
- 支持回掉機制,支持close,terminate
下麵來一下代碼是怎麼實現的
import threading import queue import time import contextlib class ThreadingPool: def __init__(self, num): self.max = num self.terminal = False self.q = queue.Queue() self.generate_list = [] # 保存已經生成的線程 self.free_list = [] # 保存那些已經完成任務的線程 def run(self, func, args=None, callbk=None): self.q.put((func, args, callbk)) # 將任務信息作為一個元祖放到隊列中去 if len(self.free_list) == 0 and len(self.generate_list) < self.max: self.threadstart() def threadstart(self): t = threading.Thread(target=self.handel) t.start() def handel(self): current_thread = threading.current_thread() self.generate_list.append(current_thread) event = self.q.get() while event != 'stop': func, args, callbk = event flag = True try: ret = func(*args) except Exception as e: flag = False ret = e if callbk is not None: try: callbk(ret) except Exception as e: pass if not self.terminal: with self.auto_append_remove(current_thread): event = self.q.get() else: event = 'stop' else: self.generate_list.remove(current_thread) def terminate(self): self.terminal = True while self.generate_list: self.q.put('stop') self.q.empty() def close(self): num = len(self.generate_list) while num: self.q.put('stop') num -= 1 @contextlib.contextmanager def auto_append_remove(self, thread): self.free_list.append(thread) try: yield finally: self.free_list.remove(thread) def f(i): # time.sleep(1) return i def f1(i): print(i) p = ThreadingPool(5) for i in range(20): p.run(func=f, args=(i,), callbk=f1) p.close()
九、協程
協程,又稱微線程,協程執行看起來有點像多線程,但是事實上協程就是只有一個線程,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯,此外因為只有一個線程,不需要多線程的鎖機制,也不存在同時寫變數衝突。協程的適用場景:當程式中存在大量不需要CPU的操作時(IO)下麵來看一個利用協程例子
from gevent import monkey import gevent import requests # 把標準庫中的thread/socket等給替換掉 # 這樣我們在後面使用socket的時候可以跟平常一樣使用,無需修改任何代碼,但是它變成非阻塞的了. monkey.patch_all() # 猴子補丁 def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
上面的例子,利用協程,一個線程完成所有的請求,發出請求的時候,不會等待回覆,而是一次性將所有的請求都發出求,收到一個回覆就處理一個回覆,這樣一個線程就解決了所有的事情,效率極高。
十、小結
這篇博文是pyton基礎知識的最後一篇,後面會講的博文會講開始講前端的知識,這裡附上目錄http://www.cnblogs.com/Wxtrkbc/p/5606048.html,以後會繼續更新的,