1. course 1.進程創建的兩種方式 1. 開啟進程的第一種方式: 2. 開啟進程的第二種方式: 3. 簡單應用 2.獲取進程pid 3.驗證進程之間的空間隔離 4. join 5.進程的其他參數 6.守護進程 7.僵屍進程孤兒進程 基於 環境( ) 主進程需要等待子進程結束之後,主進程才結束 ...
1. course
1.進程創建的兩種方式
開啟進程的第一種方式:
from multiprocessing import Process import random import time def task(name): print(f'{name} is running') time.sleep(random.randint(1, 3)) print(f'{name} is gone') if __name__ == '__main__': # 在windows環境下, 開啟進程必須在 __name__ == '__main__' 下麵 p = Process(target=task, args=('常鑫')) # 創建一個進程對象 p.start() ''' 只是想操作系統發出一個開闢子進程的信號,然後就執行下一行 這個信號操作系統收到後,會從記憶體中開闢一個子進程空間,然後將主進程所有數據copy載入到子進程,然後再調用cpu去執行開闢子進程的開銷很大 ''' print('開始') time.sleep(2) # 所以永遠先執行主進程的代碼
開啟進程的第二種方式:
from multiprocessing import Process import random import time class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run1(self): print(f'{self.name} is running') time.sleep(random.randint(1, 3)) print(f'{self.name} is gone') if __name__ == '__main__': p = MyProcess('常鑫') p.start() print('==主')
簡單應用
# 簡單應用/ from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(1) print(f'{name} is gone') def task1(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') def task2(name): print(f'{name} is running') time.sleep(3) print(f'{name} is gone') if __name__ == '__main__': p1 = Process(target=task, args=('常鑫一號英雄',)) p2 = Process(target=task, args=('常鑫二號英雄',)) start_time = time.time() task(1) task1(2) task2(3) print(f'結束時間{time.time() - start_time}') # 三個進程併發或者並行的執行三個任務 # 創建進程是並行,不創建是串列
2.獲取進程pid
import os
print(f'子進程:{os.getpid()}')
print(f'主進程:{os.getppid()}')
cmd命令查看pid
tasklist 查看所有進程的pid
tasklist|findstr pycharm 查看pycharm的pid
from multiprocessing import Process
import os
print(f'子進程:{os.getpid()}')
print(f'主進程:{os.getppid()}')
def task(name):
print(f'子進程:{os.getpid()}')
print(f'主進程:{os.getppid()}')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫',))
p.start()
print('==主開始')
print(f'==主{os.getpid()}')
print(f'===主{os.getppid()}')
3.驗證進程之間的空間隔離
初始子進程的時候copy主進程,之後主進程和子進程沒有任何聯繫,不共同享用任何內容
from multiprocessing import Process
import time
name = '常鑫'
def task():
global name
name = '郭記'
print(f'子進程的名字是: {name}')
if __name__ == '__main__':
p = Process(target=task)
p.start()
time.sleep(1)
print(f'主進程的名字是: {name}')
----------------------------分割線-----------------------------
lst = ['郭蘇慧', ]
def task1():
lst.append('郭記')
print(f'子進程的名字是: {lst}')
if __name__ == '__main__':
p = Process(target=task1)
p.start()
time.sleep(2)
print(f'主進程的名字是: {lst}')
4. join
join 讓主進程等待子進程結束之後再執行主進程
join 只針對主進程,如果join下麵多次join 他是不阻塞的
join 就是阻塞,主進程有join,主進程下麵的代碼一律不執行,直到進程執行完畢之後,在執行.
# 正確 重點
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(1, 4):
p = Process(target=task, args=(i,))
l1.append(p)
p.start()
for i in l1:
i.join()
print(f'==主{time.time() - start_time}')
錯誤示範:
for i in range(1,4):
p = Process(target=task,args=(i,))
p.start()
p.join()
'''
p1 = Process(target=task,args=(1,))
p1.start()
p1.join()
p2 = Process(target=task,args=(2,))
p2.start()
p2.join()
p3 = Process(target=task,args=(3,))
p3.start()
p3.join()
'''
# join讓主進程等待子進程結束之後再執行主進程
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫',))
p.start()
p.join()
print('==主進程開始')
# 多個進程使用join
def task(name, sec):
print(f'{name} is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
star_time = time.time()
start_time = time.time()
p1 = Process(target=task, args=('常鑫', 1))
p2 = Process(target=task, args=('李業', 2))
p3 = Process(target=task, args=('海狗', 3))
p1.start()
p2.start()
p3.start()
# join 只針對主進程,如果join下麵多次join 他是不阻塞的.
p1.join()
p2.join()
p3.join()
print(f'==主{time.time()-start_time}')
# ----------------------------------------------------------
def task(name, sec):
print(f'{name}is running')
time.sleep(sec)
print(f'{name} is gone')
if __name__ == '__main__':
start_time = time.time()
p1 = Process(target=task, args=('常鑫', 3))
p2 = Process(target=task, args=('李業', 2))
p3 = Process(target=task, args=('海狗', 1))
p1.start()
p2.start()
p3.start()
# join就是阻塞
p1.join() # 等2s
print(f'==主1:{time.time() - start_time}')
p2.join()
print(f'===主2:{time.time() - start_time}')
p3.join()
print(f'==主3:{time.time() - start_time}')
5.進程的其他參數
p.terminate() # 殺死子進程 ***
print(p.is_alive()) # *** 判斷子進程 False True
p.join() # ***
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫', ), name='Alex')
p.start()
time.sleep(1)
p.terminate() # 殺死子進程 ***
p.join() # ***
time.sleep(1)
print(p.is_alive()) # *** 判斷子進程 False
print(p.name)
p.name = 'sb'
print(p.name)
print('主程式開始')
6.守護進程
p.daemon = True
將p子進程設置成守護進程,只要主進程結束,守護進程馬上結束 一定要在子進程開啟之前設置
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone')
if __name__ == '__main__':
p = Process(target=task, args=('常鑫',)) # 創建一個進程對象
p.daemon = True # 將p子進程設置成守護進程,只要主進程結束,守護進程馬上結束.
p.start()
# p.daemon = True # 一定要在子進程開啟之前設置
time.sleep(1)
print('===主')
7.僵屍進程孤兒進程
基於unix
環境(linux, macOS
)
主進程需要等待子進程結束之後,主進程才結束
==主進程時刻監測子進程的運行狀態,當子進程結束之後,一段時間之內,將子進程進行回收.==
為什麼主進程不在子進程結束後馬上對其回收呢?
- 主進程與子進程是非同步關係.主進程無法馬上捕獲子進程什麼時候結束.
- 如果子進程結束之後馬上再記憶體中釋放資源,主進程就沒有辦法監測子進程的狀態了.
unix
針對於上面的問題,提供了一個機制.所有的子進程結束之後,立馬會釋放掉文件的操作鏈接,記憶體的大部分數據,但是會保留一些內容: ==進程號,結束時間,運行狀態==,等待主進程監測,回收.
僵屍進程:==所有的子進程結束之後,在被主進程回收之前,都會進入僵屍進程狀態.==
僵屍進程有無危害???
如果父進程不對僵屍進程進行回收(
wait/waitpid
),產生大量的僵屍進程,這樣就會占用記憶體,占用進程pid
號.孤兒進程:
父進程由於某種原因結束了,但是你的子進程還在運行中,這樣你的這些子進程就成了孤兒進程.你的父進程如果結束了,你的所有的孤兒進程就會被
init
進程的回收,init
就變成了你的父進程,對你進行回收.僵屍進程如何解決???
父進程產生了大量子進程,但是不回收,這樣就會形成大量的僵屍進程,解決方式==就是直接殺死父進程==,將所有的僵屍進程變成孤兒進程進程,由
init
進行回收.
8.互斥鎖
互斥鎖:
指散佈在不同進程之間的若幹程式片斷,當某個進程運行其中一個程式片段時,其它進程就不能運行它們之中的任一程式片段,只能等到該進程運行完這個程式片段後才可以運行的一種類似於"鎖"的機制
版本一:
現在是所有的進程都併發的搶占印表機.
併發是以效率優先的,但是目前我們的需求: 順序優先.
多個進程共強一個資源時, 要保證順序優先: 串列,一個一個來.
版本二:
我們利用join 解決串列的問題,保證了順序優先,但是這個誰先誰後是固定的.
這樣不合理. 你在爭搶同一個資源的時候,應該是先到先得,保證公平.
lock與join的區別.
共同點: 都可以把併發變成串列, 保證了順序.
不同點: join人為設定順序,lock讓其爭搶順序,保證了公平性.
版本三:(for i in 迴圈)
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys
def task1(p, lock):
lock.acquire()
print(f'{p}開始列印了')
time.sleep(random.randint(1, 3))
print(f'{p}開始列印了')
lock.release()
def task2(p, lock):
lock.acquire()
print(f'{p}開始列印了')
time.sleep(random.randint(1, 3))
print(f'{p}開始列印了')
lock.release()
def task3(p, lock):
lock.acquire()
print(f'{p}開始列印了')
time.sleep(random.randint(1, 3))
print(f'{p}開始列印了')
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(1, 4):
p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(i, mutex))
p.start()
9.進程之間的通信
進程在記憶體級別是隔離的,但是文件在磁碟上
1.基於文件通信
搶票系統.
1. 先可以查票.查詢餘票數. 併發
2. 進行購買,向服務端發送請求,服務端接收請求,在後端將票數-1,返回到前端. [串列].
當很多進程共強一個資源(數據)時, 你要保證順序(數據的安全),一定要串列.
互斥鎖: 可以公平性的保證順序以及數據的安全.
基於文件的進程之間的通信:
1.效率低.
2.自己加鎖麻煩而且很容易出現死鎖.
from multiprocessing import Process
from multiprocessing import Lock
import random
import time
import json
import os
def search():
time.sleep(random.randint(1, 3)) # 模擬網路延遲(查詢環節)
with open('db.json', encoding='utf-8') as f1:
dic = json.load(f1)
print(f'{os.getpid()}查看了票的剩餘量,還剩餘{dic["count"]}')
def paid():
with open('db.json', encoding='utf-8') as f1:
dic = json.load(f1)
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1, 3))
with open('db.json', encoding='utf-8', mode='w') as f1:
json.dump(dic, f1)
print(f'{os.getpid()}購票成功, 還剩餘{dic["count"]}票')
else:
time.sleep(1)
print(f'{os.getpid()}購票未成功')
def task(lock):
search()
lock.acquire()
paid()
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(5):
p = Process(target=task, args=(mutex,))
p.start()
2.基於隊列通信
隊列: 把隊列理解成一個容器,這個容器可以承載一些數據,
隊列的特性: 先進先出永遠保持這個數據. FIFO 羽毛球筒.
q.put(5555) 當隊列滿了時,在進程put數據就會阻塞.
print(q.get()) 當數據取完時,在進程get數據也會出現阻塞,直到某一個進程put數據.
print(q.get(timeout=3)) 阻塞3秒,3秒之後還阻塞直接報錯.
print(q.get(block=False)) 只要遇到阻塞就會報錯.
利用隊列Queue改進選票系統作業:
- 票數放入隊列中存儲
- 開啟多個進程進行選票,查票為併發效果,買票為串列效果
- 購買成功、失敗都需要提示
from multiprocessing import Process
from multiprocessing import Queue
import random
import time
import os
def search(q):
get = q.get() # 取出
print(f'{os.getpid()}查看了票的剩餘量,還剩餘{get["count"]}')
q.put(get) # 輸入
def paid(q):
time.sleep(random.randint(1, 3))
q_dic = q.get() # 取出
q.put(q_dic) # 輸入
if q_dic["count"] > 0:
q_dic["count"] -= 1
print(f"{os.getpid()}購買成功!{q_dic['count']} ")
try:
q.put(q_dic, block=False)
except Exception:
pass
else:
print(f"{os.getpid()}購買失敗")
def task(q):
search(q)
paid(q)
if __name__ == '__main__':
q = Queue(1)
q.put({"count": 3})
for i in range(5):
p = Process(target=task, args=(q,))
p.start()
模擬雙十一排隊搶小米手機,多用戶搶購,只能選取前10個用戶:
開啟多個用戶搶購買手機。
只能限定10人購買。
最終將10個用戶的排名展示出來。
import os
from multiprocessing import Queue
from multiprocessing import Process
def task(q):
try:
q.put(f'{os.getpid()}', block=False)
except Exception:
return
if __name__ == '__main__':
q = Queue(10)
for i in range(100):
p = Process(target=task, args=(q,))
p.start()
for i in range(1, 10):
print(f'排名第{i}的用戶是{q.get()}')
10.生產者消費者模塊
編程思想,模型,設計模式,理論等等,都是交給你一種編程的方法,以後你遇到類似的情況,套用即可.
生產者消費者模型三要素:
生產者: 產生數據的
消費者: 接收數據做進一步處理的
容器: 盆(隊列)
那麼隊列容器起到什麼作用? 起到緩衝的作用,平衡生產力與消費力,解耦.
from multiprocessing import Process
from multiprocessing import Queue
import random
import time
def producer(q, name):
for i in range(1, 6):
time.sleep(random.randint(1, 2))
res = f'{i}號包子'
q.put(res)
print(f'生產者{name}生產了{res}')
def consumer(q, name):
while 1:
try:
food = q.get(timeout=3)
time.sleep(random.randint(1, 3))
print(f'消費者{name}吃了吃了吃了{food}')
except Exception:
pass
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, '常鑫'))
p2 = Process(target=consumer, args=(q, '常鑫鑫'))
p1.start()
p2.start()
2. thread
1.線程的理論知識
1. 什麼是線程:進程是資源單位, 線程是執行單位.
進程:進程會在記憶體中開闢一個進程空間,將主進程的資料數據全部複製一份,線程會執行裡面的代碼.
2. 線程vs進程:
1. 開啟進程的開銷非常大,比開啟線程的開銷大很多.
2. 開啟線程的速度非常快,要快幾十倍到上百倍.
3. 線程與線程之間可以共用數據,進程與進程之間需藉助隊列等方法實現通信.
3. 線程的應用: 數據共用, 開銷小,速度快,
併發: 一個cpu 看起來像是同時執行多個任務,單個進程開啟三個線程,併發的執行任務
主線程子線程沒有地位之分,但是,一個進程誰在幹活? 一個主線程在幹活,當幹完活了,你得等待其他線程幹完活之後,才能結束本進程
==什麼是線程==
一條流水線的工作流程.
進程: 在記憶體中開啟一個進程空間,然後將主進程的所有的資源數據複製一份,然後調用cpu去執行這些代碼.
之前的描述不夠具體:
開啟一個進程:
在記憶體中開啟一個進程空間,然後將主進程的所有的資源數據複製一份,然後調用線程去執行代碼
進程是資源單位, 線程是執行單位.
以後你描述開啟一個進程:
開啟一個進程:進程會在記憶體中開闢一個進程空間,將主進程的資料數據全部複製一份,線程會執行裡面的代碼.
==線程vs進程==
- 開啟進程的開銷非常大,比開啟線程的開銷大很多.
- 開啟線程的速度非常快.要快幾十倍到上百倍.
- 線程線程之間可以共用數據,進程與進程之間需藉助隊列等方法實現通信.
==線程的應用==
併發: 一個cpu 看起來像是同時執行多個任務.
單個進程開啟三個線程.併發的執行任務.
開啟三個進程併發的執行任務.
文本編輯器:
- 輸入文字.
- 在屏幕上顯示.
- 保存在磁碟中.
開啟多線程就非常好了:
數據共用, 開銷小,速度快.
主線程子線程沒有地位之分,但是,一個進程誰在幹活? 一個主線程在幹活,當幹完活了,你得等待其他線程幹完活之後,才能結束本進程
2.開啟線程的兩張方式
第一種方式:
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(1)
print(f'{name} in gone')
if __name__ == '__main__':
p1 = Thread(target=task, args=('常鑫',))
p1.start()
print('===主線程')
第二種方式:
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, name, l1, s1):
super().__init__()
self.name = name
self.l1 = l1
self.s1 = s1
def run(self):
print(f'{self.name} is running')
print(f'{self.l1} is running')
print(f'{self.s1} is running')
time.sleep(1)
print(f'{self.name} is gone')
print(f'{self.l1} is gone')
print(f'{self.s1} is gone')
if __name__ == '__main__':
p1 = MyThread('常鑫', [1, 2, 3], '100')
p1.start()
print('===主線程')
3.線程vs進程的代碼對比
開啟速度對比,線程比進程
from multiprocessing import Process def work(): print('hello') if __name__ == '__main__': # 在主進程下開啟線程 t = Process(target=work) t.start() print('主線程/主進程')
from threading import Thread import time def task(name): print(f'{name} is running') time.sleep(1) print(f'{name} is gone') if __name__ == '__main__': t1 = Thread(target=task, args=('海狗',)) t1.start() print('===主線程') # 線程是沒有主次之分的.
對比
pid
==同一個pid
==from threading import Thread import os def task(): print(os.getpid()) if __name__ == '__main__': t1 = Thread(target=task) t2 = Thread(target=task) t1.start() t2.start() print(f'===主線程{os.getpid()}')
同一個進程內線程共用內部數據
同一進程內的資源數據對於這個進程的多個線程來說是共用的. from threading import Thread x = 3 def task(): global x x = 100 if __name__ == '__main__': t1 = Thread(target=task) t1.start() t1.join() print(f'===主線程{x}')
4.線程的相關其他方法(瞭解)
# Thread實例對象的方法
p1.setName('子線程1') # 設置線程名
p1.getName() # 返回線程名
---print(p1.name) # 獲取線程名 ***
print(p1.isAlive()) # 返回線程是否活動的。
# threading模塊提供的一些方法:
print(current_thread()) # 獲取當前線程的對象
print(currentThread()) # 獲取當前線程的對象
print(enumerate()) # 返回一個列表,包含所有的線程對象
---print(activeCount()) # *** 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
from threading import currentThread
from threading import enumerate
from threading import activeCount
import os
import time
x = 9
def task():
print(currentThread())
time.sleep(1)
print('666')
if __name__ == '__main__':
p1 = Thread(target=task, name='p1') # name 設置線程名
p2 = Thread(target=task, name='p2') # name 設置線程名
p1.start()
p2.start()
# Thread實例對象的方法
p1.setName('子線程1') # 設置線程名
p2.setName('子線程1') # 設置線程名
p1.getName() # 返回線程名
p2.getName() # 返回線程名
print(p1.name) # 獲取線程名 ***
print(p2.name) # 獲取線程名 ***
print(p1.isAlive()) # 返回線程是否活動的。
print(p2.isAlive()) # 返回線程是否活動的。
# threading模塊提供的一些方法:
print(currentThread()) # 獲取當前線程的對象
print(enumerate()) # 返回一個列表,包含所有的線程對象
print(activeCount()) # *** 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
print(f'主線程{os.getpid()}')
5.守護線程(考點)
join: 阻塞 告知主線程要等待我子線程執行完畢之後再執行主線程
主線程什麼時候結束???
守護線程 等待非守護子線程以及主線程結束之後,結束.
from threading import Thread
import time
def foo():
print(123) # 1
time.sleep(1)
print("end123") # 4
def bar():
print(456) # 2
time.sleep(2)
print("end456") # 5
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
print("main-------") # 3
# 結果:
# 123
# 456
# main-------
# end123
# end456
6.互斥鎖(考點)
正常情況加鎖之後編程串列
鎖之後加上延遲就不一定,有的可能就會出現插隊現象
from threading import Thread
from threading import Lock
import time
import random
x = 10
def task(lock):
lock.acquire()
time.sleep(random.randint(1, 3)) # 卡點
global x
temp = x
time.sleep(0.1)
temp = temp - 1
x = temp
lock.release()
if __name__ == '__main__':
mutex = Lock()
l1 = []
for i in range(10):
t = Thread(target=task, args=(mutex,))
l1.append(t)
t.start()
time.sleep(1)
print(f'主線程{x}')
7.死鎖現象與遞歸鎖
- 死鎖現象就是: A進程那著A鑰匙去找B鑰匙,B進程拿著B鑰匙去找A鑰匙
- 遞歸鎖: 可以解決死鎖現象,業務需要多個鎖時,優先考慮遞歸鎖
- 鎖必須寫成==
lock_A = lock_B = RLock()
==格式,原理是==pid
==都一樣,每鎖一次,鎖的數量加一,解開的時候減一.鎖的數量如果不為零其他線程不能搶鎖==from threading import RLock
==導入模塊
死鎖現象:
from threading import Thread
from threading import Lock
import time
lock_A = Lock()
lock_B = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到的A')
lock_B.acquire()
print(f'{self.name}拿到的B')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到的B')
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到的A')
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
遞歸鎖:
from threading import Thread
from threading import RLock
import time
lock_A = lock_B = RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到的A')
lock_B.acquire()
print(f'{self.name}拿到的B')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到的B')
lock_A.acquire()
print(f'{self.name}拿到的A')
time.sleep(1)
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
8.信號量
也是一種鎖, 控制併發數量
==from threading import current_thread
==獲取當前線程的對象模塊
==from threading import Semaphore
==導入信號量模塊
==sem = Semaphore(5)
==實例化信號量 不寫時無窮大
==sem.acquire()
==函數裡面獲取信號量
from threading import Thread
from threading import Semaphore
from threading import current_thread
import random
import time
sem = Semaphore(5)
def task():
sem.acquire()
print(f'{current_thread().name} 房間')
time.sleep(random.randint(1, 3))
sem.release()
if __name__ == '__main__':
for i in range(30):
t = Thread(target=task, )
t.start()
9. ==GIL
==全局解釋器鎖
好多自稱大神的說,GIL鎖就是python的致命缺陷,Python不能多核,併發不行等等 .....==理論上來說:單個進程的多線程可以利用多核.==
但是,開發
Cpython
解釋器的程式員,給進入解釋器的線程加了鎖.為什麼加鎖?
- 當時都是單核時代,而且
cpu
價格非常貴. - 如果不加全局解釋器鎖, 開發
Cpython
解釋器的程式員就會在源碼內部各種主動加鎖,解鎖,非常麻煩,各種死鎖現象等等.他為了省事兒,直接進入解釋器時給線程加一個鎖. - ==優點: 保證了
Cpython
解釋器的數據資源的安全.== - ==缺點: 單個進程的多線程不能利用多核.==
- 當時都是單核時代,而且
Jpython
沒有GIL鎖.pypy
也沒有GIL鎖.現在多核時代, 我將
Cpython
的GIL鎖去掉行麽?因為
Cpython
解釋器所有的業務邏輯都是圍繞著單個線程實現的,去掉這個GIL鎖,幾乎不可能.==單個進程的多線程可以併發,但是不能利用多核,不能並行. 多個進程可以併發,並行.==
==io
密集型: 單個進程的多線程合適,併發執行==
==計算密集型:多進程的並行==
10.==GIL
==鎖與==lock
==鎖的區別
- 相同點: 都是同種鎖,互斥鎖.
- 不同點:
- GIL鎖全局解釋器鎖,保護解釋器內部的資源數據的安全.
- GIL鎖 上鎖,釋放無需手動操作.
- 自己代碼中定義的互斥鎖保護進程中的資源數據的安全.
- 自己定義的互斥鎖必須自己手動上鎖,釋放鎖.
- GIL鎖全局解釋器鎖,保護解釋器內部的資源數據的安全.
11.驗證計算密集型IO密集型的效率
==
io
密集型: 單個進程的多線程的併發效率高合適.併發執行====計算密集型:多進程的併發並行效率高.並行==
代碼驗證:
計算密集型: 單個進程的多線程併發 vs 多個進程的併發並行 from multiprocessing import Process from threading import Thread import time def task(): count = 0 for i in range(30000000): # (三千萬) count += 1 if __name__ == '__main__': # 多進程的併發,並行 2.3737263679504395秒 start_time = time.time() l1 = [] for i in range(4): p = Process(target=task,) l1.append(p) p.start() for i in l1: i.join() print(f'執行時間:{time.time()-start_time}') # 多線程的併發 6.290118932723999秒 start_time = time.time() l1 = [] for i in range(4): p = Thread(target=task,) l1.append(p) p.start() for i in l1: i.join() print(f'執行時間:{time.time()-start_time}') 計算密集型: 多進程的併發並行效率高.
# IO密集型: 單個進程的多線程併發 vs 多個進程的併發並行 from multiprocessing import Process from threading import Thread import time def task(): count = 0 time.sleep(1) count += 1 if __name__ == '__main__': # 多進程的併發,並行 3.0123958587646484秒 start_time = time.time() l1 = [] for i in range(50): p = Process(target=task, ) l1.append(p) p.start() for p in l1: p.join() print(f'執行效率:{time.time() - start_time}') # 多線程的併發 1.0087950229644775秒 start_time = time.time() l1 = [] for i in range(50): p = Thread(target=task,) l1.append(p) p.start() for p in l1: p.join() print(f'執行效率:{time.time()- start_time}') 對於IO密集型: 單個進程的多線程的併發效率高.
12.多線程實現==socket
==通信
無論是多線程還是多進程,如果按照之前的寫法,來一個客戶端請求,我就開一個線程,來一個請求開一個線程,應該是這樣: 你的電腦允許範圍內,開啟的線程進程數量越多越好.
服務端:
from threading import Thread
import socket
def communicate(conn, addr):
while 1:
try:
from_client_data = conn.recv(1024)
print(f'來{addr[1]}的信息{from_client_data.decode("utf-8")}')
to_client_data = input('>>>').strip()
conn.send(to_client_data.encode('utf-8'))
except Exception:
break
conn.close()
def _accket():
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
while 1:
conn, addr = server.accept()
t = Thread(target=communicate, args=(conn, addr))
t.start()
if __name__ == '__main__':
_accket()
客戶端:
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while 1:
try:
to_server_data = input('>>>').strip()
client.send(to_server_data.encode('utf-8'))
from_server_data = client.recv(1024)
print(f'來自伺服器的消息: {from_server_data.decode("utf-8")}')
except Exception:
break
client.close()
13.進程池線程池
from concurrent.futures import ProcessPoolExecutor # 線程池模塊
from concurrent.futures import ThreadPoolExecutor # 進程池模塊
p = ProcessPoolExecutor() # 預設不寫,進程池裡面的進程數與cpu核個數相等(並行(並行+併發))
t = ThreadPoolExecutor() # 預設不寫, cpu核個數*5 線程數 (併發)
print(os.cpu_count()) # 查看電腦幾核
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import random
import time
import os
print(os.cpu_count()) # 查看電腦幾核
def task():
print(f'pid號: {os.getpid()} 來了')
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
# 開啟進程池 (並行(並行+併發))
p = ProcessPoolExecutor() # 預設不寫,進程池裡面的進程數與cpu個數相等
for i in range(20):
p.submit(task, )
# 開啟線程池 (併發)
t = ThreadPoolExecutor() # 預設不寫, cpu個數*5 線程數
for i in range(40):
t.submit(task, )
14.阻塞 非阻塞 非同步 同步
問題出在哪裡?
1. 分析結果的過程是串列,效率低.
2. 你將所有的結果全部都爬取成功之後,放在一個列表中,分析.
問題1解決:
在開進程池,再開進程,耗費資源.
'''
爬取一個網頁需要2s,併發爬取10個網頁:2.多s.
分析任務: 1s. 10s. 總共12.多秒.
現在這個版本的過程:
非同步發出10個爬取網頁的任務,然後4個進程併發(並行)的先去完成4個爬取網頁的任務,然後誰先結束,誰進行下一個
爬取任務,直至10個任務全部爬取成功.
將10個爬取結果放在一個列表中,串列的分析.
爬取一個網頁需要2s,分析任務: 1s,總共3s,總共3.多秒(開啟進程損耗).
. 10s.
下一個版本的過程:
非同步發出10個 爬取網頁+分析 的任務,然後4個進程併發(並行)的先去完成4個爬取網頁+分析 的任務,
然後誰先結束,誰進行下一個 爬取+分析 任務,直至10個爬取+分析 任務全部完成成功.
'''
回調函數是主進程幫助你實現的, 回調函數幫你進行分析任務. 明確了進程的任務: 只有一個網路爬取.
分析任務: 回調函數執行了.對函數之間解耦.
極值情況: 如果回調函數是IO任務,那麼由於你的回調函數是主進程做的,所以有可能影響效率.
回調不是萬能的,如果回調的任務是IO,
那麼非同步 + 回調機制 不好.此時如果你要效率只能犧牲開銷,再開一個線程進程池.
非同步就是回調! 這個是錯的!! 非同步,回調是兩個概念.
'''
如果多個任務,多進程多線程處理的IO任務.
# 1. 剩下的任務 非IO阻塞. 非同步 + 回調機制
# 2. 剩下的任務 IO << 多個任務的IO 非同步 + 回調機制
# 3. 剩下的任務 IO >= 多個任務的IO 第二種解決方式,或者兩個進程線程池.
'''
15.非同步 調用機制
16.事件==Event
==
17.協程的初識
18.協程