python day 19 2019/10/31 [TOC] 學習資料來自老男孩教育 1. 購物商城作業要求 2. 多進程 2.1 簡述多進程 程式 :是一個指令的集合 進程 :正在執行的程式;或者說,當你運行一個程式,你就啟動了一個進程。 1. 編寫完的代碼,沒有運行時,稱為程式;正在運行的代碼, ...
目錄
python day 19
2019/10/31
學習資料來自老男孩教育
1. 購物商城作業要求
2. 多進程
2.1 簡述多進程
程式:是一個指令的集合
進程:正在執行的程式;或者說,當你運行一個程式,你就啟動了一個進程。
1. 編寫完的代碼,沒有運行時,稱為程式;正在運行的代碼,稱為進程
2. 程式是死的(靜態的),進程是活的(動態的)
操作系統輪流讓各個任務交替執行,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。
多進程中,每個進程中所有數據(包括全局變數)都各自擁有一份,互不影響。
'''
模擬多任務處理:一邊唱歌一邊跳舞
'''
import time
def sing():
for i in range(5):
print('唱歌')
dance()
time.sleep(0.2)
def dance():
print('跳舞')
if __name__ == '__main__':
sing()
2.2 multiprocessing模塊,創建多進程程式
程式開始運行時,首先會創建一個主進程(或者叫父進程)
在主進程下,我們可以創建新的進程(子進程),子進程依賴於主進程,如果主進程結束,程式會退出。
python提供了非常好用的多進程包multiprocessing,藉助這個包,可以軟體構件完成從單進程到併發執行的轉換。
mulprocessing模塊提供了一個Process類來創建一個進程對象.
from multiprocessing import Process
import time
def sing(name):
for i in range(5):
print(''.join([name,'唱歌']))
time.sleep(1)
def dance():
for i in range(5):
print('跳舞')
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=sing,args=('lanxing',))
# target表示調用對象,args表示調用對象的位置參數元組
p2 = Process(target=dance,name='進程2')
print(p1.name)#就是進程的id,即pid
print(p2.name)
p1.start()
p2.start()
p1.join()
p2.join()
Process(target,name,args),Process類初始化時有三個屬性。
Process類的常用方法:
p.start():啟動進程,並調用該子進程中的p.run()
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,當我們自定義類時,類中一定要實現該方法。
p.terminate():強制終止進程p,不會進行任何清理操作
p.is_alive():如果p仍然運行,返回True,否則就是False。
p.join([timeout]):主進程等待p纏上,timeout是可選的超時時間。
Process類常用屬性:
name:當前進程實例別名,預設為Process-N,N為從1開始遞增的整數。
pid:當前進程實例的PID值
2.3 if name=='main'的說明
if name == “main”:說明
一個python的文件有兩種使用的方法,第一是直接作為程式執行,第二是import到其他的python程式中被調用(模塊重用)執行。
因此if name == 'main': 的作用就是控制這兩種情況執行代碼的過程,name 是內置變數,用於表示當前模塊的名字。
在if name == 'main': 下的代碼只有在文件作為程式直接執行才會被執行,而import到其他程式中是不會被執行的
在 Windows 上,子進程會自動 import 啟動它的這個文件,而在 import 的時候是會執行這些語句的。如果不加if name == "main":的話就會無限遞歸創建子進程
所以必須把創建子進程的部分用那個 if 判斷保護起來
import 的時候 name 不是 main ,就不會遞歸運行了
2.4 多進程中的變數
全局變數在多個進程中不共用:進程之間的數據是獨立的,預設情況下互不影響。
創建新的進程還能夠使用類的方式,可以自定義一個類,繼承Process類,實例化這個類的時候,就等同於實例化一個進程對象。
2.5 進程池
進程池:用來創建多個進程。
當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態類生成多個進程,但如果是上百甚至上千個進程,手動的去創建進程的工程量巨大,此時就可以用到multiprocessing模塊提供的Pool。
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個的新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來執行。
multiprocessing.Pool常⽤函數解析:
apply_async(func[, args[, kwds]]) : 使⽤⾮堵塞⽅式調⽤func(並⾏執⾏, 堵塞⽅式必須等待上⼀個進程退出才能執⾏下⼀個進程) , args為傳遞給func的參數列表, kwds為傳遞給func的關鍵字參數列表;
apply(func[, args[, kwds]])(瞭解即可幾乎不用) 使⽤阻塞⽅式調⽤func.
close(): 關閉Pool, 使其不再接受新的任務;
terminate(): 不管任務是否完成, ⽴即終⽌;(用得比較少)
join(): 主進程阻塞, 等待⼦進程的退出, 必須在close或terminate之後使⽤;
from multiprocessing import Pool
import random,time
def work(num):
print(random.random()*num)
time.sleep(3)
if __name__=='__main__':
po = Pool(3) #定義一個進程池,最大進程數為3,預設大小為CPU核數。
for i in range(10):
po.apply_async(work,(i,)) # apply_async選擇要調用的目標,每次迴圈會用空出來的子進程去調用目標
po.close() #進程池關閉之後不再接收新的請求
po.join() #等待po中所有子進程結束,必須放在close後面。
# 在多進程中,主進程一般用來等待,真正的任務都在子進程中執行。
2.6 進程間通信Queue
多進程之間,預設是不共用數據的。
通過Queue(隊列Q)可以實現進程中的數據傳遞。
Q本身是一個消息隊列。
如何添加消息(入隊操作):
from multiprocessing import Queue
q = Queue(3) # 初始化一個Queue對象,最多可接受3條消息
q.put('消息1') #添加的消息數據類型不限
q.put('消息2')
q.put('消息3')
print(q.full())
可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞。
初始化Queue()對象時(例如:q=Queue()),若括弧中沒有指定最大可接收消息數或者數量為負值,那麼就代表可接受的消息數量沒有上限。
Queue.qsize():返回當前隊列包含的消息數量
Queue.empty():如果隊列為空,返回True,否則返回False
Queue.full():如果隊列滿了,返回True,反之False
Queue.get([block[,timeout]]):獲取隊列中的一條消息,然後將其從隊列中移除,block預設值為True。
如果block使⽤預設值, 且沒有設置timeout(單位秒) , 消息列隊如果為空, 此時程式將被阻塞(停在讀取狀態) , 直到從消息列隊讀到消息為⽌,如果設置了timeout, 則會等待timeout秒, 若還沒讀取到任何消息, 則拋出"Queue.Empty"異常。
如果block值為False, 消息列隊如果為空,則會⽴刻拋出“Queue.Empty”異常。
Queue.get_nowait():相當於Queue.get(False)
Queue.put(item,[block[,timeout]]):將item消息寫入隊列,block預設值為True
如果block使用預設值,且沒有設置timeout(單位秒),消息隊列如果已經沒有空間可寫入,此間程式將被阻塞(停在寫入狀態),直到從消息隊列騰出空間為止,如果設置了Ture和timeout,則會等待timeout秒,若還沒空間,則拋出Queue.Full異常。
如果block值為False,消息隊列如果還沒有空間可寫入,則會立刻拋出Queue.Full異常。
Queue.put_nowait(item):相當於Queue.put(item,False)
進程池創建的進程之間通信:如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue()而不是multiprocessing.Queue()。否則會得到一個報錯信息:RuntimeError:Queue objects should only be shared between processesthrough inheritance.
3. 線程
3.1 線程介紹
線程:實現多任務的另一種形式
一個進程中,也經常需要同時做多件事,就需要同時運行多個"子任務",這些子任務,就是線程。
線程又被稱為輕量級進程(lightweight process),是更小的執行單元。
一個進程可擁有多個並行(concurrent)線程,當中每一個線程,共用當前進程的資源。
一個進程中的線程共用相同的記憶體單元/記憶體地址空間-->可以訪問相同的變數和對象,而且它們從同一堆中分配對象-->通信/數據交換/同步操作
由於線程間的通信是在同一地址空間上進行的,所以不需要額外的通信機制,這就使得通信更簡便而且信息傳遞的速度也更快。
程式裡面需要大量計算,就使用多進程,如果IO流多,就使用多線程。
3.2 線程和進程的區別
進程是系統進行資源分配和調度的一個獨立單位。
進程在執行過程中擁有獨立的記憶體單元,而多個線程共用記憶體,從而極大地提高了程式的運行效率。
一個程式至少有一個進程,一個進程至少有一個線程。
線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。
線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源,但是它可與同屬一個進程的其他的線程共用進程所擁有的全部資源。
線程的劃分尺度小於進程(資源比進程少),使得多線程程式的併發性高。
線程不能夠獨立執行,必須依存在進程中。
線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。
3.3 thread模塊和threading模塊
python的thread模塊是比較底層的模塊,在各個操作系統中表現形式不同(低級模塊)。
python的threading模塊是對thread做了一些包裝的,可以更加方便地被使用(高級模塊)。
thread有一些缺點,在threading中得到了彌補,所以直接學習threading模塊。
import threading
if __name__=='__main__':
# 任何進程預設會啟動一個線程,這個線程稱為主線程,主線程可以啟動新的子線程。
# current_thread():返回當前線程的實例
# .name:當前線程的名稱
print('主線程%s啟動'%(threading.current_thread().name))
import threading,time
def saySorry():
print('子線程啟動%s啟動'%(threading.current_thread().name))
time.sleep(1)
print('親愛的,我錯了,我能吃飯了嗎?')
if __name__=='__main__':
print('主線程%s啟動'%(threading.current_thread().name))
for i in range(5):
t=threading.Thread(target=saySorry) # Thread():指定線程要執行的代碼
t.start()
3.4 創建線程
創建線程有兩種方式:
- 第一,通過threading.Thread直接線上程中運行函數;
- 第二,通過繼承threading.Thread類來創建線程
這種方法只需要重載threading.Thread類的run方法,然後調用start()開啟線程就可以了。
import threading,time
class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msq = ''.join(["I'm'",self.name,'@',str(i)])
# name屬性中保存了當前線程的名字
print(msq)
if __name__=='__main__':
t = MyThread()
t.start()
3.5 線程的5種狀態
多線程程式的執行順序是不確定的(操作系統決定)。當執行到sleep語句時,線程將被阻塞(Blocked),到sleep結束後,線程進入就緒(Runnable)狀態,等待調度。而線程調度將自行選擇一個線程執行。代碼中只能保證每個線程都運行完整個run函數,但是線程的啟動順序/run函數中每次迴圈的執行順序都不能確定。
線程的5種狀態:
- 新狀態:線程對象已經創建,還沒有在其上調用start()方法
- 可運行狀態:當線程有資格運行,但調度程式還沒有把它選定為運行線程時線程所處的狀態。當start()方法調用時,線程首先進入可運行狀態。線上程運行之後或者從阻塞、等待或睡眠狀態回來後,也返回到可運行狀態。
- 運行狀態:線程調度程式從可運行池中選擇一個線程作為當前線程時線程所處的狀態。這也是線程進入運行狀態的唯一一種方式。
- 等待/阻塞/睡眠狀態:這是線程有資格運行時它所處的狀態。實際上這三個狀態組合為一種,其共同點是:線程仍舊是活的(可運行的),但是當前沒有條件運行。但是如果某件事件出現,他可能返回到可運行狀態。
- 死亡態:當線程的run()方法完成時就認為它死去。這個線程對象也許是活的,但是,它已經不是一個單獨執行的線程。線程一旦死亡,就不能復生。如果在一個死去的線程上調用start()方法,會拋出RuntimeError: threads can only be started once異常。
3.6 線程共用全局變數
在一個進程內的所有線程共用全局變數,多線程之間的數據共用(這點要比多進程好)
缺點就是:可能造成多個線程同時修改一個變數(即線程非安全),可能造成混亂。
import threading
num = 0
def test1():
global num
for i in range(1000000):
num += 1
print('線程1中',num)
def test2():
global num
for i in range(1000000):
num += 1
print('線程2中',num)
p1 = threading.Thread(target=test1)
p1.start()
p2 = threading.Thread(target=test2)
p2.start()
print('主線程中',num)
# 每次數都不同的原因是p1,p2調度時,都還未執行完。
3.7 線程同步-互斥鎖
當多個線程幾乎同時修改某一個共用數據的時候,需要進行同步控制。
線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。
互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性(原子性)。
互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共用數據時,先將其鎖定,此時資源的狀態為鎖定,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成非鎖定,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進入寫入操作,從而保證了多線程情況下數據的正確性。
threading模塊中定義了Lock類,可以方便地處理鎖定。
創建鎖:mutex = threading.Lock()
鎖定:mutex.acquire()
鎖釋放:mutex.release()
import threading
num = 0
mutex = threading.Lock()
def test1():
global num
if mutex.acquire():
for i in range(1000000):
num += 1
mutex.release()
print('線程1中',num)
def test2():
global num
if mutex.acquire():
for i in range(1000000):
num += 1
mutex.release()
print('線程2中',num)
p1 = threading.Thread(target=test1)
p1.start()
p2 = threading.Thread(target=test2)
p2.start()
print('主線程中',num)
3.8 線程同步-死鎖
死鎖(錯誤情況,理解即可):線上程間共用多個資源的時候,如果兩個線程分別占有⼀部分資源並且同時等待對⽅的資源, 就會造成死鎖 。
⼊到了死鎖狀態, 可以使⽤ctrl-z退出。
import threading
import time
class MyThread1(threading.Thread):
def run(self):
if mutexA.acquire():
print(self.name + '---do1---up---')
time.sleep(1)
if mutexB.acquire():
print(self.name + '---do1---down---')
mutexB.release()
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
time.sleep(1)
if mutexB.acquire():
print(self.name + '---do2---up---')
if mutexA.acquire():
print(self.name + '---do2---down---')
mutexA.release()
mutexB.release()
if __name__ == '__main__':
mutexA = threading.Lock()
mutexB = threading.Lock()
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
在多線程程式中,死鎖問題很大一部分是由於線程同時獲取多個鎖造成的。如一個線程獲取了第一個鎖,然後在獲取第二個鎖的時候發生阻塞,那麼這個線程就可能阻塞其他線程的執行,從而導致整個程式假死(兩個人每人一根筷子)
3.9 同步和非同步
同步調⽤:確定調用的順序
按順序購買四大名著
非同步調⽤:不確定順序
你喊你朋友吃飯 , 你朋友說知道了 , 待會忙完去找你 ,你就去做別的了
堵塞和非堵塞
# 多個線程有序執⾏
import threading,time
class Task1(threading.Thread):
def run(self):
while True:
if lock1.acquire():
print('-----Task1-----')
time.sleep(1)
lock2.release()
class Task2(threading.Thread):
def run(self):
while True:
if lock2.acquire():
print('-----Task2-----')
time.sleep(1)
lock3.release()
class Task3(threading.Thread):
def run(self):
while True:
if lock3.acquire():
print('-----Task3-----')
time.sleep(1)
lock1.release()
lock1 = threading.Lock()
#創建另外一把鎖,並且鎖上
lock2 = threading.Lock()
lock2.acquire()
#再創建一把鎖並鎖上
lock3 = threading.Lock()
lock3.acquire()
t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start() t2.start() t3.start()
# 無序執行,方式很多
import threading,time
num = 0
def test1():
global num
mutex.acquire()
for i in range(1000000):
num += 1
mutex.release()
print("1",num)
def test2():
global num
while True:
if mutex.acquire(False):
for i in range(1000000):
num += 1
print("2", num)
mutex.release()
break
else:
print("該幹嘛幹嘛")
mutex = threading.Lock()
p1 = threading.Thread(target=test1)
p2 = threading.Thread(target=test2)
p1.start()
p2.start()
3.10 生產者消費者模式
⽣產者消費者模式:
線上程世界⾥, ⽣產者就是⽣產數據的線程, 消費者就是消費數據的線程(做包子,吃包子)。
經常會出現生產數據的速度大於消費數據的速度,或者生產速度跟不上消費速度。
⽣產者消費者模式是通過⼀個容器(緩衝區)來解決⽣產者和消費者的強耦合問題(耦合指關係密切)。
例如兩個線程共同操作一個列表,一個放數據,一個取數據。
⽣產者和消費者彼此之間不直接通訊, ⽽通過阻塞隊列來進⾏通訊。
Python的Queue模塊:實現了3種類型的隊列來實現線程同步,包括:
FIFO(先⼊先出) 隊列 Queue,
LIFO(後⼊先出) 棧 LifoQueue,
優先順序隊列 PriorityQueue
區別在於隊列中條目檢索的順序不同:
在FIFO隊列中,按照先進先出的順序檢索條目。
在LIFO隊列中,最後添加的條目最先檢索到(操作類似一個棧)。
在優先順序隊列中,條目被保存為有序的(使用heapq模塊)並且最小值的條目被最先檢索。
這些隊列都實現了鎖原語(可以理解為原⼦操作, 即要麼不做, 要麼就做完) , 能夠在多線程中直接使⽤。
現階段只要求掌握其中一種,FIFO隊列。
class queue.Queue(maxsize=0):
FIFO隊列的構造器。maxsize為一個整數,表示隊列的最大條目數,可用來限制記憶體的使用。
一旦隊列滿,插入將被阻塞直到隊列中存在空閑空間。如果maxsize小於等於0,隊列大小為無限。maxsize預設為0。
import threading
import time
from queue import Queue
class Pro(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize()<1000:
for i in range(100):
count = count + 1
msg = '生成產品' + str(count)
queue.put(msg)#隊列中添加新產品
print(msg)
time.sleep(1)
class Con(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消費了' + queue.get()
print(msg)
time.sleep(1)
if __name__ == "__main__":
queue = Queue()
#創建一個隊列。線程中能用,進程中不能使用
for i in range(500):#創建500個產品放到隊列里
queue.put(‘初始產品’ + str(i))#字元串放進隊列
for i in range(2):#創建了兩個線程
p = Pro()
p.start()
for i in range(5):#5個線程
c = Con()
c.start()
3.11 ThreadLocal變數
⼀個ThreadLocal變數雖然是全局變數, 但每個線程都只能讀寫⾃⼰線程的獨⽴副本, 互不⼲擾。ThreadLocal解決了參數在⼀個線程中各個函數之間互相傳遞的問題。
可以理解為全局變數local_school是⼀個dict, 可以綁定其他變數。
ThreadLocal最常⽤的地⽅就是為每個線程綁定⼀個資料庫連接, HTTP請求, ⽤戶身份信息等, 這樣⼀個線程的所有調⽤到的處理函數都可以⾮常⽅便地訪問這些資源。
import threading
#創建一個全局的對象
local_school = threading.Local()
def process_student():
#獲取當前線程關聯的student
std = local_school.student
print("Hello %s (in %s)" %(std,threading.current_thread().name))
def process_thread(name):
#綁定ThreadLocal的Student
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread,args=('zhangsan',),name='t1')
t2 = threading.Thread(target=process_thread,args=('老王',),name='t2')
t1.start()
t2.start()