關於進程的複習: 線程: Threading模塊的其它方法: 守護線程 線程鎖: 互斥鎖 遞歸鎖 ...
關於進程的複習:
# 管道 # 數據的共用 Manager dict list # 進程池 # cpu個數+1 # ret = map(func,iterable) # 非同步 自帶close和join # 所有結果的[] # apply # 同步的:只有當func執行完之後,才會繼續向下執行其他代碼 # ret = apply(func,args=()) # 返回值就是func的return # apply_async # 非同步的:當func被註冊進入一個進程之後,程式就繼續向下執行 # apply_async(func,args=()) # 返回值 : apply_async返回的對象obj # 為了用戶能從中獲取func的返回值obj.get() # get會阻塞直到對應的func執行完畢拿到結果 # 使用apply_async給進程池分配任務, # 需要先close後join來保持多進程和主進程代碼的同步性
# 回調函數是在主進程中執行的 from multiprocessing import Pool def func1(n): return n+1 def func2(m): print(m) if __name__ == '__main__': p = Pool(5) for i in range(10,20): p.apply_async(func1,args=(i,),callback=func2) p.close() p.join()
import requests from urllib.request import urlopen from multiprocessing import Pool ## 爬取位元組數的例子: def get(url): response = requests.get(url) if response.status_code == 200: return url,response.content.decode('utf-8') # def get_urllib(url): # ret = urlopen(url) # return ret.read().decode('utf-8') def call_back(args): url,content = args print(url,len(content)) if __name__ == '__main__': url_lst = [ 'https://www.cnblogs.com/', 'http://www.baidu.com', 'https://www.sogou.com/', 'http://www.sohu.com/', ] p = Pool(5) for url in url_lst: p.apply_async(get,args=(url,),callback=call_back) p.close() p.join()
線程:
import os import time from threading import Thread ## 多線程併發 # def func(a, b): # n = a + b # print(n, os.getpid()) # 都在一個進程中 # # print('主線程', os.getpid()) # 都在一個進程中 # # for i in range(10): # t = Thread(target=func, args=(i, 6)) # t.start() ## 同一進程中的各個線程,都可以共用該進程所擁有的資源 # def func(a,b): # global g # g = 0 # print(g,os.getpid()) # # g = 100 # t_lst = [] # for i in range(10): # t = Thread(target=func,args=(i,5)) # t.start() # t_lst.append(t) # for t in t_lst : t.join() # print(g) ## 繼承 類 實現 # class MyThread(Thread): # # 重寫初始化方法 # def __init__(self,arg): # super().__init__() # self.arg = arg # # def run(self): # time.sleep(1) # print(':::',self.arg) # # t = MyThread(10) # t.start() # class Sayhi(Thread): # def __init__(self,name): # super().__init__() # self.name=name # # def run(self): # time.sleep(1) # print('%s say hello' % self.name) # # if __name__ == '__main__': # t = Sayhi('egon') # t.start() # print('主線程') # https://www.cnblogs.com/Eva-J/articles/8306047.html # 進程 是 最小的 記憶體分配單位 # 線程 是 操作系統調度的最小單位 # 線程直接被CPU執行,進程內至少含有一個線程,也可以開啟多個線程 # 開啟一個線程所需要的時間要遠遠小於開啟一個進程 # 多個線程內部有自己的數據棧,數據不共用 # 全局變數在多個線程之間是共用的 # GIL鎖(即全局解釋器鎖) # 在Cpython解釋器下的python程式 在同一時刻 多個線程中只能有一個線程被CPU執行 # 高CPU : 計算類 --- 高CPU利用率 # 如果真的需要高併發,可使用多進程,避免多線程GIL鎖 # 高IO : 一般程式都不會受GIL影響. 爬取網頁 200個網頁 # qq聊天 send recv # 處理日誌文件 讀文件 # 處理web請求 # 讀資料庫 寫資料庫 import time from threading import Thread from multiprocessing import Process def func(n): n + 1 if __name__ == '__main__': start = time.time() t_lst = [] for i in range(100): t = Thread(target=func,args=(i,)) t.start() t_lst.append(t) for t in t_lst:t.join() t1 = time.time() - start ## 證明線程比進程快的例子: start = time.time() t_lst = [] for i in range(100): t = Process(target=func, args=(i,)) t.start() t_lst.append(t) for t in t_lst: t.join() t2 = time.time() - start print(t1,t2)
Threading模塊的其它方法:
import time import threading ## Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 def wahaha(n): time.sleep(0.5) print(n,threading.current_thread(),threading.get_ident()) # print(threading.current_thread().getName()) for i in range(10): threading.Thread(target=wahaha,args=(i,)).start() # 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。 print(threading.active_count()) # 11 print(threading.current_thread()) # 返回當前的線程變數。 # 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。 print(threading.enumerate()) ## https://www.cnblogs.com/Eva-J/articles/8306047.html
守護線程
import time from threading import Thread def func1(): while True: print('*'*10) time.sleep(1) def func2(): print('in func2') time.sleep(5) t = Thread(target=func1,) t.daemon = True t.start() t2 = Thread(target=func2,) t2.start() t2.join() # 等待t2執行完畢 print('主線程') # 守護進程隨著主進程代碼的執行結束而結束 # 守護線程會在主線程結束之後等待其他子線程的結束才結束 # 主進程在執行完自己的代碼之後不會立即結束 而是等待子進程結束之後 回收子進程的資源 #1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),然後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(否則會產生僵屍進程),才會結束, #2 主線程在其他非守護線程運行完畢後才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味著進程的結束,進程整體的資源都將被回收,
而進程必須保證非守護線程都運行完畢後才能結束。
線程鎖: 互斥鎖 遞歸鎖
import time from threading import Lock,Thread '''由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之後,當多個線程同時修改同一條數據時可能會出現臟數據, 所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。 ''' ### Lock 互斥鎖 只有一把鑰匙 # def func(lock): # global n # lock.acquire() # 加鎖 # temp = n # time.sleep(0.2) # n = temp - 1 # lock.release() # # n = 10 # t_lst = [] # lock = Lock() # for i in range(10):lkqi # t = Thread(target=func,args=(lock,)) # 線程鎖 # t.start() # t_lst.append(t) # # for t in t_lst: t.join() # print(n) ### 科學家吃面 問題 造成死鎖 # noodle_lock = Lock() # fork_lock = Lock() # def eat1(name): # noodle_lock.acquire() # print('%s拿到麵條啦'%name) # fork_lock.acquire() # print('%s拿到叉子了'%name) # print('%s吃面'%name) # fork_lock.release() # noodle_lock.release() # # def eat2(name): # fork_lock.acquire() # print('%s拿到叉子了'%name) # time.sleep(1) # noodle_lock.acquire() # print('%s拿到麵條啦'%name) # print('%s吃面'%name) # noodle_lock.release() # fork_lock.release() # # Thread(target=eat1,args=('alex',)).start() # Thread(target=eat2,args=('Egon',)).start() # Thread(target=eat1,args=('bossjin',)).start() # Thread(target=eat2,args=('nezha',)).start() ## 當在同一個進程或線程中用到2把以上的鎖時,就容易產生死鎖。 from threading import RLock # 遞歸鎖 可以多次使用 解決死鎖問題 fork_lock = noodle_lock = RLock() # 類似於一串上的兩把鑰匙 def eat1(name): noodle_lock.acquire() # 一把鑰匙 print('%s拿到麵條啦'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s吃面'%name) fork_lock.release() noodle_lock.release() # 必須全部釋放之後,其它人才能用。 def eat2(name): fork_lock.acquire() print('%s拿到叉子了'%name) time.sleep(1) noodle_lock.acquire() print('%s拿到麵條啦'%name) print('%s吃面'%name) noodle_lock.release() fork_lock.release() Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('Egon',)).start() Thread(target=eat1,args=('bossjin',)).start() Thread(target=eat2,args=('nezha',)).start()
線程的信號量:
# import time # from threading import Semaphore,Thread # def func(sem,a,b): # sem.acquire() # time.sleep(1) # print(a+b) # sem.release() # # sem = Semaphore(4) # for i in range(10): # t = Thread(target=func,args=(sem,i,i+5)) # t.start() '''互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 , 比如廁所有3個坑,那最多只允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。 ''' import threading, time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s" % n) semaphore.release() if __name__ == '__main__': num = 0 semaphore = threading.BoundedSemaphore(3) # 最多允許5個線程同時運行 for i in range(12): t = threading.Thread(target=run, args=(i,)) t.start()
線程的事件:
# !/usr/bin/env python # -*- coding:utf-8 -*- '''線程的事件用於主線程式控制制其他線程的執行,事件主要提供了三個方法 set、wait、clear。 事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞, 如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。 ''' import threading def do(event): print('start.') event.wait() print('execute') event_obj = threading.Event() # 預設False for i in range(6): t = threading.Thread(target=do,args=(event_obj,)) t.start() event_obj.clear() # 設為False input2 = input('>>>') if input2 == 'true': event_obj.set() # 設為 True # 事件被創建的時候 # False狀態 # wait() 阻塞 # True狀態 # wait() 非阻塞 # clear 設置狀態為False # set 設置狀態為True
定時器 Timer
import time from threading import Timer def func(): print('時間同步') #1-3 while True: t = Timer(5,func).start() # 非阻塞的 5秒之後開始 time.sleep(2) # 定時器,指定n秒後執行某操作
更多內容,參考:http://www.cnblogs.com/wupeiqi/articles/5040827.html
線程隊列:
import queue
# 線程的隊列,內置了鎖,保證數據安全
# q = queue.Queue() # 隊列 先進先出
# q.put(123)
# print(q.get())
# q.put_nowait(456)
# print(q.get_nowait())
# q = queue.LifoQueue() # 棧 先進後出
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get())
q = queue.PriorityQueue() # 優先順序隊列
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
q.put((-5,'f'))
q.put((-5,'d'))
q.put((1,'?'))
print(q.get()) # 數字越小,優先順序越高 按ascii碼順序
線程池:https://www.cnblogs.com/Eva-J/articles/8306047.html#_label17