一、進程 1、多任務原理 多任務是指操作系統同時可以運行多個任務。 單核CPU實現多任務原理:操作系統輪流讓各個任務交替執行; 多核CPU實現多任務原理:真正的執行多任務只能在多核CPU上實現,多出來的任務輪流調度到每個核心上執行。 併發:看上去一起執行,任務數多於CPU核心數; 並行:真正的一起執 ...
一、進程
1、多任務原理
多任務是指操作系統同時可以運行多個任務。
- 單核CPU實現多任務原理:操作系統輪流讓各個任務交替執行;
- 多核CPU實現多任務原理:真正的執行多任務只能在多核CPU上實現,多出來的任務輪流調度到每個核心上執行。
- 併發:看上去一起執行,任務數多於CPU核心數;
- 並行:真正的一起執行,任務數小於等於CPU核心數。
實現多任務的方式:
1、多進程模式
2、多線程模式
3、協程模式
4、多進程+多線程模式
2、進程
對於操作系統而言,一個任務就是一個進程;
進程是系統中程式執行和資源分配的基本單元,每個進程都有自己的數據段、代碼段、堆棧段。
下麵是一小段程式,一個單任務的例子。在其中,有兩個輸出語句分別在在兩個不同的迴圈當中,單任務的執行方式,也就是最初學習時,當一個迴圈沒有結束的時候,無法執行到下麵的程式當中。如果想要讓兩個迴圈可以同時在執行,就是在實現多任務,當然不是說同時輸出,而是兩個迴圈都在執行著。
1 from time import sleep
2 # 只能執行到那一個迴圈,執行不了run,所以叫單任務
3 def run():
4 while True:
5 print("&&&&&&&&&&&&&&&")
6 sleep(1.2)
7
8 if __name__ == "__main__":
9 while True:
10 print("**********")
11 sleep(1)
12 run()
接下來啟用多任務,通過進程來實現。
multiprocessing庫:跨平臺版本的多進程模塊,提供了一個Process類來代表一個進程對象(fork僅適用於Linux)。
下麵的程式是在一個父進程中創建一個子進程,讓父進程和子進程可以都在執行,創建方式程式中已經很簡潔了。可以自己把這兩段程式複製下來運行一下,看看輸出的效果。
1 from multiprocessing import Process
2 from time import sleep
3 import os
4
5 def run(str):
6 # os.getpid()獲取當前進程id號
7 # os.getppid()獲取當前進程的父進程id號
8 while True:
9 print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid()))
10 sleep(0.5)
11
12 if __name__ == "__main__":
13 print("主(父)進程啟動 %s" % (os.getpid()))
14 # 創建子進程
15 # target說明進程執行的任務
16 p = Process(target=run, args=("nice",))
17 # 啟動進程
18 p.start()
19
20 while True:
21 print("**********")
22 sleep(1)
我想第一個單任務的程式就不必說了吧,就是一個死迴圈,一直沒有執行到下麵的run函數。第二段程式是通過多進程實現的多任務,兩個迴圈都能執行到,我把結果截圖放下麵,最好自己去試一下。
3、父子進程的先後順序
上面的多進程的例子中輸出了那麼多,我們使用的時候究竟是先執行哪個後執行哪個呢?根據我們的一般思維來說,我們寫的主函數其實就是父進程,在主函數中間,要調用的也就是子進程。
1 from multiprocessing import Process
2 from time import sleep
3 import os
4
5 def run():
6 print("啟動子進程")
7 print("子進程結束")
8 sleep(3)
9
10 if __name__ == "__main__":
11 print("父進程啟動")
12 p = Process(target=run)
13 p.start()
14
15 # 父進程的結束不能影響子進程,讓進程等待子進程結束再執行父進程
16 p.join()
17
18 print("父進程結束")
4、全局變數在多個進程中不能共用
在多進程的程式當中定義的全局變數在多個進程中是不能共用的,篇幅較長在這裡就不舉例子了,可以自己試一下。這個也是和稍後要說的線程的一個區別,線上程中,變數是可以共用的,也因此衍生出一些問題,稍後再說。
5、啟動多個進程
在正常工作使用的時候,當然不止有有個一個兩個進程,畢竟這一兩個也起不到想要的效果。那麼就需要採用更多的進程,這時候需要通過進程池來實現,就是在進程池中放好你要建立的進程,然後執行的時候,把他們都啟動起來,就可以同時進行了,在一定的環境下可以大大的提高效率。當然這個也和起初提到的有關,如果你的CPU是單核的,那麼多進程也只是起到了讓幾個任務同時在執行著,並沒有提高效率,而且啟動進程的時候還要花費一些時間,因此在多核CPU當中更能發揮優勢。
在multiprocessing中有個Pool方法,可以實現進程池。在利用進程池時可以設置要啟動幾個進程,一般情況下,它預設和你電腦的CPU核數一致,也可以自己設置,如果設置的進程數多於CPU核數,那多出來的進程會輪流調度到每個核心上執行。下麵是啟動多個進程的過程。
1 from multiprocessing import Pool
2 import os
3 import time
4 import random
5
6
7 def run(name):
8 print("子進程%s啟動--%s" % (name, os.getpid()))
9 start = time.time()
10 time.sleep(random.choice([1,2,3,4,5]))
11 end = time.time()
12 print("子進程%s結束--%s--耗時%.2f" % (name, os.getpid(), end-start))
13
14 if __name__ == "__main__":
15 print("啟動父進程")
16
17 # 創建多個進程
18 # Pool 進程池 :括弧里的數表示可以同時執行的進程數量
19 # Pool()預設大小是CPU核心數
20 pp = Pool(4)
21 for i in range(5):
22 # 創建進程,放入進程池,統一管理
23 pp.apply_async(run, args=(i,))
24
25 # 在調用join之前必須先調用close,調用close之後就不能再繼續添加新的進程了
26 pp.close()
27 # 進程池對象調用join還等待進程池中所有的子進程結束
28 pp.join()
29
30 print("結束父進程")
6、文件拷貝(單進程與多進程對比)
(1)單進程實現
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 實現文件的拷貝 6 def copyFile(rPath, wPath): 7 fr = open(rPath, 'rb') 8 fw = open(wPath, 'wb') 9 context = fr.read() 10 fw.write(context) 11 fr.close() 12 fw.close() 13 14 path = r'F:\python_note\線程、協程' 15 toPath = r'F:\python_note\test' 16 17 # 讀取path下的所有文件 18 filesList = os.listdir(path) 19 20 # 啟動for迴圈處理每一個文件 21 start = time.time() 22 for fileName in filesList: 23 copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName)) 24 25 end = time.time() 26 print('總耗時:%.2f' % (end-start))View Code
(2)多進程實現
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 實現文件的拷貝 6 def copyFile(rPath, wPath): 7 fr = open(rPath, 'rb') 8 fw = open(wPath, 'wb') 9 context = fr.read() 10 fw.write(context) 11 fr.close() 12 fw.close() 13 14 path = r'F:\python_note\線程、協程' 15 toPath = r'F:\python_note\test' 16 17 18 if __name__ == "__main__": 19 # 讀取path下的所有文件 20 filesList = os.listdir(path) 21 22 start = time.time() 23 pp = Pool(4) 24 for fileName in filesList: 25 pp.apply_async(copyFile, args=(os.path.join( 26 path, fileName), os.path.join(toPath, fileName))) 27 pp.close() 28 pp.join() 29 end = time.time() 30 print("總耗時:%.2f" % (end - start))View Code
上面兩個程式是兩種方法實現同一個目標的程式,可以將其中的文件路徑更換為你自己的路徑,可以看到最後計算出的耗時是多少。也許有人發現並不是多進程的效率就高,說的的確沒錯,因為創建進程也要花費時間,沒準啟動進程的時間遠多讓這一個核心運行所有核心用的時間要多。這個例子也只是演示一下如何使用,在大數據的任務下會有更深刻的體驗。
7、進程對象
我們知道Python是一個面向對象的語言。而且Python中萬物皆對象,進程也可以封裝成對象,來方便以後自己使用,只要把他封裝的足夠豐富,提供清晰的介面,以後使用時會快捷很多,這個就根據自己的需求自己可以試一下,不寫了。
8、進程間通信
上面提到過進程間的變數是不能共用的,那麼如果有需要該怎麼辦?通過隊列的方式進行傳遞。在父進程中創建隊列,然後把隊列傳到每個子進程當中,他們就可以共同對其進行操作。
1 from multiprocessing import Process, Queue
2 import os
3 import time
4
5
6 def write(q):
7 print("啟動寫子進程%s" % (os.getpid()))
8 for chr in ['A', 'B', 'C', 'D']:
9 q.put(chr)
10 time.sleep(1)
11 print("結束寫子進程%s" % (os.getpid()))
12
13 def read(q):
14 print("啟動讀子進程%s" % (os.getpid()))
15 while True:
16 value = q.get()
17 print("value = "+value)
18 print("結束讀子進程%s" % (os.getpid()))
19
20 if __name__ == "__main__":
21 # 父進程創建隊列,並傳遞給子進程
22 q = Queue()
23 pw = Process(target=write, args=(q,))
24 pr = Process(target=read, args=(q,))
25
26 pw.start()
27 pr.start()
28 # 寫進程結束
29 pw.join()
30 # pr進程里是個死迴圈,無法等待期結束,只能強行結束
31 pr.terminate()
32 print("父進程結束")
二、線程
1、線程
- 在一個進程內部,要同時乾多件事,就需要運行多個"子任務",我們把進程內的多個"子任務"叫做線程
- 線程通常叫做輕型的進程,線程是共用記憶體空間,併發執行的多任務,每一個線程都共用一個進程的資源
- 線程是最小的執行單元而進程由至少一個線程組成。如何調度進程和線程,完全由操作系統來決定,程式自己不能決定什麼時候執行,執行多長時間
模塊:
1、_thread模塊 低級模塊(更接近底層)
2、threading模塊 高級模塊,對_thread進行了封裝
2、啟動一個線程
同樣,先給一個多線程的例子,其中,仍然使用run函數作為其中的一個子線程,主函數為父線程。通過threading的Thread方法創建線程並開啟,join來等待子線程。
1 import threading
2 import time
3
4
5 def run():
6 print("子線程(%s)啟動" % (threading.current_thread().name))
7
8 # 實現線程的功能
9 time.sleep(1)
10 print("列印")
11 time.sleep(2)
12
13 print("子線程(%s)結束" % (threading.current_thread().name))
14
15
16 if __name__ == "__main__":
17 # 任何進程都預設會啟動一個線程,稱為主線程,主線程可以啟動新的子線程
18 # current_thread():返回線程的實例
19 print("主線程(%s)啟動" % (threading.current_thread().name))
20
21 # 創建子線程
22 t = threading.Thread(target=run, name="runThread")
23 t.start()
24
25 # 等待線程結束
26 t.join()
27
28 print("主線程(%s)結束" % (threading.current_thread().name))
3、線程間數據共用
多線程和多進程最大的不同在於,多進程中,同一個變數,各自有一份拷貝存在每個進程中,互不影響。
而多線程所有變數都由所有線程共用。所以任何一個變數都可以被任何一個線程修改,因此,線程之間共用數據最大的危險在於多個線程同時修改一個變數,容易把內容改亂了。
1 import threading
2
3
4 num = 10
5
6 def run(n):
7 global num
8 for i in range(10000000):
9 num = num + n
10 num = num - n
11
12 if __name__ == "__main__":
13 t1 = threading.Thread(target=run, args=(6,))
14 t2 = threading.Thread(target=run, args=(9,))
15
16 t1.start()
17 t2.start()
18 t1.join()
19 t2.join()
20
21 print("num = ",num)
4、線程鎖
在第三小點中已經提到了,多線程的一個缺點就是數據是共用的,如果有兩個線程正同時在修改這個數據,就會出現混亂,它自己也不知道該聽誰的了,尤其是在運算比較複雜,次數較多的時候,這種錯誤的機會會更大。
當然,解決辦法也是有的,那就是利用線程鎖。加鎖的意思就是在其中一個線程正在對數據進行操作時,讓其他線程不得介入。這個加鎖和釋放鎖是由人來確定的。
- 確保了這段代碼只能由一個線程從頭到尾的完整執行
- 阻止了多線程的併發執行,要比不加鎖時候效率低。包含鎖的代碼段只能以單線程模式執行
- 由於可以存在多個鎖,不同線程持有不同的鎖,並試圖獲取其他的鎖,可能造成死鎖導致多個線程掛起,只能靠操作系統強制終止
1 def run(n):
2 global num
3 for i in range(10000000):
4 lock.acquire()
5 try:
6 num = num + n
7 num = num - n
8 finally:
9 # 修改完釋放鎖
10 lock.release()
11
12 if __name__ == "__main__":
13 t1 = threading.Thread(target=run, args=(6,))
14 t2 = threading.Thread(target=run, args=(9,))
15
16 t1.start()
17 t2.start()
18 t1.join()
19 t2.join()
20
21 print("num = ",num)
上面這段程式是迴圈多次num+n-n+n-n的過程,變數n分別設為6和9是在兩個不同的線程當中,程式中已經加了鎖,你可以先去掉試一下,當迴圈次數較小的時候也許還能正確,但次數一旦取的較高就會出現混亂。
加鎖是在迴圈體當中,依次執行加減法,定義中說到確保一個線程從頭到尾的完整執行,也就是在計算途中,不會有其他的線程打擾。你可以想一下,如果一個線程執行完加法,正在執行減法,另一個線程進來了,它要先進行加法時的初始sum值該是多少呢,線程二不一定線上程一的什麼時候進來,萬一剛進來時候,線程一恰好給sum賦值了,而線程二仍然用的是正準備進來時候的sum值,那從這裡開始豈不已經分道揚鑣了。所以,運算的次數越多,結果會越離譜。
這個說完了,還有一個小小的改進。你是否記得讀寫文件時候書寫的一種簡便形式,通過with來實現,可以避免我們忘記關閉文件,自動幫我們關閉。當然還有一些其他地方也用到了這個方法。這裡也同樣適用。
1 # 與上面代碼功能相同,with lock可以自動上鎖與解鎖
2 with lock:
3 num = num + n
4 num = num - n
5、ThreadLocal
- 創建一個全局的ThreadLocal對象
- 每個線程有獨立的存儲空間
- 每個線程對ThreadLocal對象都可以讀寫,但是互不影響
根據名字也可以看出,也就是在本地建個連接,所有的操作在本地進行,每個線程之間沒有數據的影響。
1 import threading
2
3
4 num = 0
5 local = threading.local()
6
7 def run(x, n):
8 x = x + n
9 x = x - n
10
11 def func(n):
12 # 每個線程都有local.x
13 local.x = num
14 for i in range(10000000):
15 run(local.x, n)
16 print("%s-%d" % (threading.current_thread().name, local.x))
17
18
19 if __name__ == "__main__":
20 t1 = threading.Thread(target=func, args=(6,))
21 t2 = threading.Thread(target=func, args=(9,))
22
23 t1.start()
24 t2.start()
25 t1.join()
26 t2.join()
27
28 print("num = ",num)
6、控制線程數量
1 '''
2 控制線程數量是指控制線程同時觸發的數量,可以拿下來這段代碼運行一下,下麵啟動了5個線程,但是他們會兩個兩個的進行
3 '''
4 import threading
5 import time
6
7 # 控制併發執行線程的數量
8 sem = threading.Semaphore(2)
9
10 def run():
11 with sem:
12 for i in range(10):
13 print("%s---%d" % (threading.current_thread().name, i))
14 time.sleep(1)
15
16
17 if __name__ == "__main__":
18 for i in range(5):
19 threading.Thread(target=run).start()
上面的程式是有多個線程,但是每次限制同時執行的線程,通俗點說就是限制併發線程的上限;除此之外,也可以限制線程數量的下限,也就是至少達到多少個線程才能觸發。
1 import threading
2 import time
3
4
5 # 湊夠一定數量的線程才會執行,否則一直等著
6 bar = threading.Barrier(4)
7
8 def run():
9 print("%s--start" % (threading.current_thread().name))
10 time.sleep(1)
11 bar.wait()
12 print("%s--end" % (threading.current_thread().name))
13
14
15 if __name__ == "__main__":
16 for i in range(5):
17 threading.Thread(target=run).start()
7、定時線程
1 import threading
2
3
4 def run():
5 print("***********************")
6
7 # 延時執行線程
8 t = threading.Timer(5, run)
9 t.start()
10
11 t.join()
12 print("父線程結束")
8、線程通信
1 import threading
2 import time
3
4
5 def func():
6 # 事件對象
7 event = threading.Event()
8 def run():
9 for i in range(5):
10 # 阻塞,等待事件的觸發
11 event.wait()
12 # 重置阻塞,使後面繼續阻塞
13 event.clear()
14 print("**************")
15 t = threading.Thread(target=run).start()
16 return event
17
18 e = func()
19
20 # 觸發事件
21 for i in range(5):
22 time.sleep(2)
23 e.set()
9、一個小慄子
這個例子是用了生產者和消費者來模擬,要進行數據通信,還引入了隊列。先來理解一下。
1 import threading
2 import queue
3 import time
4 import random
5
6
7 # 生產者
8 def product(id, q):
9 while True:
10 num = random.randint(0, 10000)
11 q.put(num)
12 print("生產者%d生產了%d數據放入了隊列" % (id, num))
13 time.sleep(3)
14 # 任務完成
15 q.task_done()
16
17 # 消費者
18 def customer(id, q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("消費者%d消費了%d數據" % (id, item))
24 time.sleep(2)
25 # 任務完成
26 q.task_done()
27
28
29 if __name__ == "__main__":
30 # 消息隊列
31 q = queue.Queue()
32
33 # 啟動生產者
34 for i in range(4):
35 threading.Thread(target=product, args=(i, q)).start()
36
37 # 啟動消費者
38 for i in range(3):
39 threading.Thread(target=customer, args=(i, q)).start()
10、線程調度
1 import threading
2 import time
3
4
5 # 線程條件變數
6 cond = threading.Condition()
7
8