python 進程與線程是併發編程的兩種常見方式。進程是操作系統中的一個基本概念,表示程式在操作系統中的一次執行過程,擁有獨立的地址空間、資源、優先順序等屬性。線程是進程中的一條執行路徑,可以看做是輕量級的進程,與同一個進程中的其他線程共用相同的地址空間和資源。 ...
python 進程與線程是併發編程的兩種常見方式。進程是操作系統中的一個基本概念,表示程式在操作系統中的一次執行過程,擁有獨立的地址空間、資源、優先順序等屬性。線程是進程中的一條執行路徑,可以看做是輕量級的進程,與同一個進程中的其他線程共用相同的地址空間和資源。
線程和進程都可以實現併發編程,但是它們之間有幾點不同:
- 線程間共用進程的記憶體空間,但進程間的記憶體空間是相互獨立的;
- 線程創建和銷毀的開銷較小,但是線程切換的開銷較大;
- 進程間通信需要較為複雜的 IPC(Inter-Process Communication)機制,線程間通信則可以直接讀寫共用記憶體;
- 多進程可以充分利用多核 CPU 的性能,但是多線程受 GIL(Global Interpreter Lock)限制,只能利用單核 CPU 的性能。
在選擇使用進程還是線程時,需要根據具體場景和需求進行權衡和選擇。如果任務需要充分利用多核 CPU,且任務之間互不影響,可以選擇多進程;如果任務之間需要共用資源和數據,可以選擇多線程。同時,需要註意在 python 中使用多線程時,由於 GIL 的存在,可能無法實現真正的並行。
8.1 創建並使用線程
線程是操作系統調度的最小執行單元,是進程中的一部分,能夠提高程式的效率。在python中,創建線程需要使用threading模塊。該模塊的實現方法是底層調用了C語言的原生函數來實現線程的創建和管理。在系統中,所有的線程看起來都是同時執行的,但實際上是由操作系統進行時間片輪轉調度的。
使用函數創建線程: 創建線程並傳遞參數實現指定函數多線程併發,使用join
方法,等待線程執行完畢後的返回結果.
import os,time
import threading
now = lambda:time.time()
def MyThread(x,y): # 定義每個線程要執行的函數體
time.sleep(5) # 睡眠5秒鐘
print("傳遞的數據:%s,%s"%(x,y)) # 其中有兩個參數,我們動態傳入
if __name__ == "__main__":
ThreadPool = []
start = now()
for item in range(10): # 創建10個線程併發執行函數
thread = threading.Thread(target=MyThread,args=(item,item+1,)) # args =>函數的參數
thread.start() # 啟動線程
ThreadPool.append(thread)
for item in ThreadPool:
item.join()
print("[+] 線程信息: {}".format(item))
stop = now()
print("[+] 線程總耗時: {} s".format(int(stop-start)))
使用類創建內部線程: 通過定義類,將線程函數與類進行結合實現一體化該方式調用方便思維明確.
import os,time
import threading
class MyThread(threading.Thread):
def __init__(self,x,y):
super(MyThread, self).__init__()
self.x = x
self.y = y
def run(self): # 用於執行相應操作(固定寫法)
print("[+] 當前執行運算: {} + {}".format(self.x,self.y))
self.result = self.x + self.y
def get_result(self): # 獲取計算結果
try:
return self.result
except Exception:
return None
if __name__ == "__main__":
ThreadPool = []
for item in range(1,10):
obj = MyThread(item,item+1)
obj.start()
ThreadPool.append(obj)
for item in ThreadPool:
item.join()
print("[+] 獲取返回: ",item.get_result())
使用類創建外部線程: 該定義方式與上方完全不同,我們可以將執行過程定義到類的外部為單獨函數,然後類內部去調用傳參.
import os,time
import threading
def MyThreadPrint(x,y):
print("[+] 當前執行運算: {} + {}".format(x,y))
result = x + y
return result
class MyThread(threading.Thread):
def __init__(self,func,args=()):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
try:
return self.result
except Exception:
return None
if __name__ == "__main__":
ThreadPool = []
for item in range(1,10):
obj = MyThread(func=MyThreadPrint,args=(item,item+1))
obj.start()
ThreadPool.append(obj)
for item in ThreadPool:
item.join()
print("[+] 獲取返回: ",item.get_result())
線上程中創建子線程: 通過創建一個守護線程,並讓守護線程調用子線程,從而實現線程中調用線程,線程嵌套調用.
import time
import threading
# run => 子線程 => 由主線程調用它
def run(num):
print("這是第 {} 個子線程".format(num))
time.sleep(2)
# main = > 主守護線程 => 在裡面運行5個子線程
def main():
for each in range(5):
thread = threading.Thread(target=run,args=(each,))
thread.start()
print("啟動子線程: {} 編號: {}".format(thread.getName(),each))
thread.join()
if __name__ == "__main__":
daemon = threading.Thread(target=main,args=())
daemon.setDaemon(True) # 設置主線程為守護線程
daemon.start() # 啟動守護線程
daemon.join(timeout=10) # 設置10秒後關閉,不論子線程是否執行完畢
簡單的線程互斥鎖(Semaphore): 同時允許一定數量的線程更改數據,也就是限制每次允許執行的線程數.
import threading,time
semaphore = threading.BoundedSemaphore(5) #最多允許5個線程同時運行
def run(n):
semaphore.acquire() #添加信號
time.sleep(1)
print("運行這個線程中: %s"%n)
semaphore.release() #關閉信號
if __name__ == '__main__':
for i in range(20): #同時執行20個線程
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1: #等待所有線程執行完畢
pass
else:
print('----所有線程執行完畢了---')
import threading,time
class mythreading(threading.Thread):
def run(self):
semaphore.acquire() #獲取信號量鎖
print('running the thread:',self.getName())
time.sleep(2)
semaphore.release() #釋放信號量鎖
if __name__ == "__main__":
semaphore = threading.BoundedSemaphore(3) # 只運行3個線程同時運行
for i in range(20):
t1 = mythreading()
t1.start()
t1.join()
線程全局鎖(Lock): 添加本全局鎖以後,能夠保證在同一時間內保證只有一個線程具有許可權.
import time
import threading
num = 0 #定義全局共用變數
thread_list = [] #線程列表
lock = threading.Lock() #生成全局鎖
def SumNumber():
global num #在每個線程中獲取這個全局變數
time.sleep(2)
lock.acquire() #修改數據前給數據加鎖
num += 1 #每次進行遞增操作
lock.release() #執行完畢以後,解除鎖定
for x in range(50): #指定生成線程數
thread = threading.Thread(target=SumNumber)
thread.start() #啟動線程
thread_list.append(thread) #將結果列表加入到變數中
for y in thread_list: #等待執行完畢.
y.join()
print("計算結果: ",num)
線程遞歸鎖(RLock): 遞歸鎖和全局鎖差不多,遞歸鎖就是在大鎖中還要添加個小鎖,遞歸鎖是常用的鎖.
import threading
import time
num = 0 #初始化全局變數
lock = threading.RLock() #設置遞歸鎖
def fun1():
lock.acquire() #添加遞歸鎖
global num
num += 1
lock.release() #關閉遞歸鎖
return num
def fun2():
lock.acquire() #添加遞歸鎖
res = fun1()
print("計算結果: ",res)
lock.release() #關閉遞歸鎖
if __name__ == "__main__":
for x in range(10): #生成10個線程
thread = threading.Thread(target=fun2)
thread.start()
while threading.active_count() != 1: #等待所有線程執行完成
print(threading.active_count())
else:
print("所有線程運行完成...")
print(num)
線程互斥鎖量控制併發: 使用BoundedSemaphore
定義預設信號10,既可以實現控制單位時間內的程式併發量.
import os,time
import threading
def MyThread(x):
lock.acquire() # 上鎖
print("執行數據: {}".format(x))
lock.release() # 釋放鎖
time.sleep(2) # 模擬函數消耗時間
threadmax.release() # 釋放信號,可用信號加1
if __name__ == "__main__":
# 此處的BoundedSemaphore就是說預設給與10個信號
threadmax = threading.BoundedSemaphore(10) # 限制線程的最大數量為10個
lock = threading.Lock() # 將鎖內的代碼串列化(防止print輸出亂行)
ThreadPool = [] # 執行線程池
for item in range(1,100):
threadmax.acquire() # 增加信號,可用信號減1
thread = threading.Thread(target=MyThread,args=(item,))
thread.start()
ThreadPool.append(thread)
for item in ThreadPool:
item.join()
線程驅動事件(Event): 線程事件用於主線程式控制制其他線程的執行,事件主要提供了三個方法set、wait、clear、is_set
,分別用於設置檢測和清除標誌.
import threading
event = threading.Event()
def func(x,event):
print("函數被執行了: %s 次.." %x)
event.wait() # 檢測標誌位狀態,如果為True=繼續執行以下代碼,反之等待.
print("載入執行結果: %s" %x)
for i in range(10): # 創建10個線程
thread = threading.Thread(target=func,args=(i,event,))
thread.start()
print("當前狀態: %s" %event.is_set()) # 檢測當前狀態,這裡為False
event.clear() # 將標誌位設置為False,預設為False
temp=input("輸入yes解鎖新姿勢: ") # 輸入yes手動設置為True
if temp == "yes":
event.set() # 設置成True
print("當前狀態: %s" %event.is_set()) # 檢測當前狀態,這裡為True
import threading
def show(event):
event.wait() # 阻塞線程執行程式
print("執行一次線程操作")
if __name__ == "__main__":
event_obj = threading.Event() # 創建event事件對象
for i in range(10):
t1 = threading.Thread(target=show,args=(event_obj,))
t1.start()
inside = input('>>>:')
if inside == '1':
event_obj.set() # 當用戶輸入1時set全局Flag為True,線程不再阻塞
event_obj.clear() # 將Flag設置為False
線程實現條件鎖: 條件(Condition) 使得線程等待,只有滿足某條件時,才釋放N個線程.
import threading
def condition_func():
ret = False
inp = input(">> ")
if inp == '1':
ret = True
return ret
def run(n):
con.acquire() # 條件鎖
con.wait_for(condition_func) # 判斷條件
print('running...',n)
con.release() # 釋放鎖
if __name__ == "__main__":
con = threading.Condition() # 建立線程條件對象
for i in range(10):
t = threading.Thread(target=run,args=(i,))
t.start()
t.join()
單線程非同步併發執行: 在單線程下實現非同步執行多個函數,返回耗時取決於最後一個函數的執行時間.
import time,asyncio
now = lambda :time.time()
async def GetSystemMem(sleep):
print("[+] 執行獲取記憶體非同步函數.")
await asyncio.sleep(sleep) # 設置等待時間
return 1
async def GetSystemCPU(sleep):
print("[+] 執行獲取CPU非同步函數.")
await asyncio.sleep(sleep) # 設置等待時間
return 1
if __name__ == "__main__":
stop = now()
mem = GetSystemMem(1)
cpu = GetSystemCPU(4)
task=[
asyncio.ensure_future(mem), # 將多個任務添加進一個列表
asyncio.ensure_future(cpu)
]
loop=asyncio.get_event_loop() # 創建一個事件迴圈
loop.run_until_complete(asyncio.wait(task)) # 開始併發執行
for item in task:
print("[+] 返回結果: ",item.result()) # 輸出回調
print('總耗時: {}'.format(stop - now()))
8.2 創建並使用進程
進程是指正在執行的程式,創建進程需要使用multiprocessing
模塊,創建方法和線程相同,但由於進程之間的數據需要各自持有一份,所以創建進程需要更大的開銷。進程間數據不共用,多進程可以用來處理多任務,但很消耗資源。計算密集型任務最好交給多進程來處理,I/O密集型任務最好交給多線程來處理。另外,進程的數量應該和CPU的核心數保持一致,以充分利用系統資源。
使用進程函數執行命令: 通過系統提供的進程線程函數完成對系統命令的調用與執行.
>>> import os,subprocess
>>>
>>> os.system("ping -n 1 www.baidu.com") # 在當前shell中執行命令
>>>
>>> ret = os.popen("ping -n 1 www.baidu.com") # 在子shell中執行命令
>>> ret.read()
>>>
>>> subprocess.run("ping www.baidu.com",shell=True)
>>> subprocess.call("ping www.baidu.com", shell=True)
>>>
>>> ret = subprocess.Popen("ping www.baidu.com",shell=True,stdout=subprocess.PIPE)
>>> ret.stdout.read()
創建多進程與子線程: 通過使用multiprocessing庫,迴圈創建4個主進程,而在每個主進程內部又起了5個子線程.
import multiprocessing
import threading,os
def ThreadingFunction():
print("[-] ----> 子線程PPID: {}".format(threading.get_ident()))
def ProcessFunction(number):
print("[*] -> 主進程PID: {} 父進程: {}".format(os.getpid(),os.getppid()))
for i in range(5): # 在主進程里開闢5個線程
thread = threading.Thread(target=ThreadingFunction,) # 嵌套子線程
thread.start() # 執行子線程
if __name__ == "__main__":
for item in range(4): # 啟動4個主進程
proc = multiprocessing.Process(target=ProcessFunction,args=(item,))
proc.start()
proc.join()
使用基於類的方式創建進程: 除了使用函數式方式創建進程以外,我們還可以使用基於類的方式創建.
import os,time
from multiprocessing import Process
class Myprocess(Process):
def __init__(self,person):
super().__init__()
self.person = person
def run(self):
print("[*] -> 當前PID: {}".format(os.getpid()))
print("--> 傳入的人名: {}".format(self.person))
time.sleep(3)
if __name__ == '__main__':
process = Myprocess("lyshark")
#process.daemon = True # 設置p為守護進程
process.start()
進程鎖(Lock): 進程中也有鎖,可以實現進程之間數據的一致性,也就是進程數據的同步,保證數據不混亂.
# 由併發變成了串列,犧牲了運行效率,但避免了競爭
import multiprocessing
def func(loc,num):
loc.acquire() #添加進程鎖
print("hello ---> %s" %num)
loc.release() #關閉進程鎖
if __name__ == "__main__":
lock = multiprocessing.Lock() #生成進程鎖
for number in range(10):
proc = multiprocessing.Process(target=func,args=(lock,number,))
proc.start()
非同步進程池: 進程池內部維護一個進程式列,當使用時則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那麼程式就會等待,直到進程池中有可用進程為止.
import multiprocessing
import time
def ProcessFunction(number):
time.sleep(2)
print("[+] 進程執行ID: {}".format(number))
def ProcessCallback(arg):
print("[-] 進程執行結束,執行回調函數")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) # 允許進程池同時放入5個進程
for item in range(10):
pool.apply_async(func=ProcessFunction,args=(item,),callback=ProcessCallback)
pool.close()
pool.join()
from multiprocessing import Pool, TimeoutError
import time,os
def f(x):
return x*x
if __name__ == '__main__':
#啟動4個工作進程作為進程池
with Pool(processes=4) as pool:
#返回函數參數運行結果列表
print(pool.map(f, range(10)))
#在進程池中以任意順序列印相同的數字
for i in pool.imap_unordered(f, range(10)):
print(i,end=' ')
#非同步評估
res = pool.apply_async(f,(20,)) #在進程池中只有一個進程運行
print('\n',res.get(timeout=1)) #列印結果,超時為1秒
#列印該進程的PID
res = pool.apply_async(os.getpid,()) #在進程池中只有一個進程運行
print(res.get(timeout=1)) #列印進程PID
#列印4個進程的PID
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
#進程等待10秒,獲取數據超時為1秒,將輸出異常
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
8.3 多進程數據共用
一般當我們創建兩個進程後,進程各自持有一份數據,預設無法共用數據,如果我們想要共用數據必須通過一個中間件來實現數據的交換,來幫你把數據進行一個投遞,要實現進程之間的數據共用,其主要有以下幾個方法來實現進程間數據的共用.
共用隊列(Queue): 這個Queue主要實現進程與進程之間的數據共用,與線程中的Queue不同.
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
def foo(i,arg):
arg.put(i)
print('say hi',i,arg.qsize())
li = queues.Queue(20,ctx=multiprocessing)
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
共用整數(int): 整數之間的共用,只需要使用multiprocessing.Value
方法,即可實現.
import multiprocessing
def func(num):
num.value = 1024 #雖然賦值了,但是子進程改變了這個數值
print("函數中的數值: %s"%num.value)
if __name__ == "__main__":
num = multiprocessing.Value("d",10.0) #主進程與子進程共用這個value
print("這個共用數值: %s"%num.value)
for i in range(5):
num = multiprocessing.Value("d", i) #聲明進程,並傳遞1,2,3,4這幾個數
proc = multiprocessing.Process(target=func,args=(num,))
proc.start() #啟動進程
#proc.join()
print("最後列印數值: %s"%num.value)
共用數組(Array): 數組之間的共用,只需要使用multiprocessing.Array
方法,即可實現.
import multiprocessing
def func(ary): #子進程改變數組,主進程跟著改變
ary[0]=100
ary[1]=200
ary[2]=300
''' i所對應的類型是ctypes.c_int,其他類型如下參考:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
'''
if __name__ == "__main__":
ary = multiprocessing.Array("i",[1,2,3]) #主進程與子進程共用這個數組
for i in range(5):
proc = multiprocessing.Process(target=func,args=(ary,))
print(ary[:])
proc.start()
共用字典(dict): 通過使用Manager方法,實現兩個進程中的,字典與列表的數據共用.
import multiprocessing
def func(mydict, mylist):
mydict["字典1"] = "值1"
mydict["字典2"] = "值2"
mylist.append(1)
mylist.append(2)
mylist.append(3)
if __name__ == "__main__":
mydict = multiprocessing.Manager().dict() #主進程與子進程共用字典
mylist = multiprocessing.Manager().list() #主進程與子進程共用列表
proc = multiprocessing.Process(target=func,args=(mydict,mylist))
proc.start()
proc.join()
print("列表中的元素: %s" %mylist)
print("字典中的元素: %s" %mydict)
管道共用(Pipe): 通過Pipe
管道的方式在兩個進程之間共用數據,類似於Socket套接字.
import multiprocessing
def func(conn):
conn.send("你好我是子進程.") #發送消息給父進程
print("父進程傳來了:",conn.recv()) #接收父進程傳來的消息
conn.close()
if __name__ == "__main__":
parent_conn,child_conn = multiprocessing.Pipe() #管道創建兩個埠,一收一發送
proc = multiprocessing.Process(target=func,args=(child_conn,))
proc.start()
print("子進程傳來了:",parent_conn.recv()) #接收子進程傳來的數據
parent_conn.send("我是父進程,收到消息了..") #父進程發送消息給子進程
本文作者: 王瑞
本文鏈接: https://www.lyshark.com/post/b4dd0803.html
版權聲明: 本博客所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!