一、進程和線程 進程 線程 線程進程的關係區別: 1、一個程式至少有一個進程,一個進程至少有一個線程.(進程可以理解成線程的容器) 2、進程在執行過程中擁有獨立的記憶體單元,而多個線程共用記憶體,從而極大地提高了程式的運行效率。 3、線程在執行過程中與進程還是有區別的。每個獨立的線程有一個程式運行的入口 ...
一、進程和線程
進程
假如有兩個程式A和B,程式A在執行到一半的過程中,需要讀取大量的數據輸入(I/O操作),
而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。
是不是在程式A讀取數據的過程中,讓程式B去執行,當程式A讀取完數據之後,讓
程式B暫停,然後讓程式A繼續執行?
當然沒問題,但這裡有一個關鍵詞:切換
既然是切換,那麼這就涉及到了狀態的保存,狀態的恢復,加上程式A與程式B所需要的系統資
源(記憶體,硬碟,鍵盤等等)是不一樣的。自然而然的就需要有一個東西去記錄程式A和程式B
分別需要什麼資源,怎樣去識別程式A和程式B等等,所以就有了一個叫進程的抽象
進程定義:
進程就是一個程式在一個數據集上的一次動態執行過程。
進程一般由程式、數據集、進程式控制制塊三部分組成。
我們編寫的程式用來描述進程要完成哪些功能以及如何完成;
數據集則是程式在執行過程中所需要使用的資源;
進程式控制制塊用來記錄進程的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系
統感知進程存在的唯一標誌。
舉一例說明進程:
想象一位有一手好廚藝的電腦科學家正在為他的女兒烘製生日蛋糕。他有做生日蛋糕的食譜,廚房裡有所需
的原料:麵粉、雞蛋、糖、香草汁等。在這個比喻中,做蛋糕的食譜就是程式(即用適當形式描述的演算法)電腦科學家就是處理器(cpu),
而做蛋糕的各種原料就是輸入數據。進程就是廚師閱讀食譜、取來各種原料以及烘製蛋糕等一系列動作的總和。
現在假設電腦科學家的兒子哭著跑了進來,說他的頭被一隻蜜蜂蟄了。電腦科學家就記錄下他
照著食譜做到哪兒了(保存進程的當前狀態),然後拿出一本急救手冊,按照其中的指示處理蟄傷。這
里,我們看到處理機從一個進程(做蛋糕)切換到另一個高優先順序的進程(實施醫療救治),每個進程
擁有各自的程式(食譜和急救手冊)。當蜜蜂蟄傷處理完之後,這位電腦科學家又回來做蛋糕,從他
離開時的那一步繼續做下去。
線程
線程的出現是為了降低上下文切換的消耗,提高系統的併發性,並突破一個進程只能幹一樣事的缺陷,
使到進程內併發成為可能。
假設,一個文本程式,需要接受鍵盤輸入,將內容顯示在屏幕上,還需要保存信息到硬碟中。若只有
一個進程,勢必造成同一時間只能幹一樣事的尷尬(當保存時,就不能通過鍵盤輸入內容)。若有多
個進程,每個進程負責一個任務,進程A負責接收鍵盤輸入的任務,進程B負責將內容顯示在屏幕上的
任務,進程C負責保存內容到硬碟中的任務。這裡進程A,B,C間的協作涉及到了進程通信問題,而且
有共同都需要擁有的東西-------文本內容,不停的切換造成性能上的損失。若有一種機制,可以使
任務A,B,C共用資源,這樣上下文切換所需要保存和恢復的內容就少了,同時又可以減少通信所帶
來的性能損耗,那就好了。是的,這種機制就是線程。
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程式執行過程中的最小單元,由線程ID、程式
計數器、寄存器集合和堆棧共同組成。線程的引入減小了程式併發執行時的開銷,提高了操作系統的併發
性能。線程沒有自己的系統資源。
線程進程的關係區別:
1、一個程式至少有一個進程,一個進程至少有一個線程.(進程可以理解成線程的容器)
2、進程在執行過程中擁有獨立的記憶體單元,而多個線程共用記憶體,從而極大地提高了程式的運行效率。
3、線程在執行過程中與進程還是有區別的。每個獨立的線程有一個程式運行的入口、順序執行序列和
程式的出口。但是線程不能夠獨立執行,必須依存在應用程式中,由應用程式提供多個線程執行控制。
4、進程是具有一定獨立功能的程式關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調
度的一個獨立單位.
線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程
自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程式計數器,一組寄存器和棧)但是
它可與同屬一個進程的其他的線程共用進程所擁有的全部資源.
一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以併發執行.
python的GIL
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行
二、python的線程與threading模塊
1、線程的兩種調用方式
threading 模塊建立在thread模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊通過對thread進行二次封裝,提供了更方便的api來處理線程。
直接調用:
import threading
import time
def sayhi(num): #定義每個線程要運行的函數
print("running on number:%s" %num)
time.sleep(3)
if __name__ == '__main__':
t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
t1.start() #啟動線程
t2.start() #啟動另一個線程
print(t1.getName()) #獲取線程名
print(t2.getName())
繼承式調用:
import threading
import time
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num = num
def run(self):#定義每個線程要運行的函數
print("running on number:%s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
print("ending......")
三、threading.thread的實例方法
join&Daemon方法
import threading
from time import ctime,sleep
import time
def ListenMusic(name):
print ("Begin listening to %s. %s" %(name,ctime()))
sleep(3)
print("end listening %s"%ctime())
def RecordBlog(title):
print ("Begin recording the %s! %s" %(title,ctime()))
sleep(5)
print('end recording %s'%ctime())
threads = []
t1 = threading.Thread(target=ListenMusic,args=('殺手',))
t2 = threading.Thread(target=RecordBlog,args=('python線程',))
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':
for t in threads:
#t.setDaemon(True) #註意:一定在start之前設置
t.start()
# t.join()
# t1.join()
t1.setDaemon(True)
#t2.join()########考慮這三種join位置下的結果?
print ("all over %s" %ctime())
join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。
setDaemon(True):
將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程式會被無限掛起。這個方法基本和join是相反的。
當我們 在程式運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成
想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。但是有時候我們需要的是 只要主線程
完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦
其他方法:
# run(): 線程被cpu調度後自動執行線程對象的run方法
# start():啟動線程活動。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變數。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
四、同步鎖(Lock)
import threading
import time
def addNum():
global num #在每個線程中都獲取這個全局變數
#num-=1
temp=num
#print('--get num:',num )
time.sleep(0.1)
num =temp-1 #對此公共變數進行-1操作
num = 1000 #設定一個共用變數
thread_list = []
for i in range(1000):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有線程執行完畢
t.join()
print('final num:', num )
觀察:time.sleep(0.1) /0.001/0.0000001 結果分別是多少?
多個線程都在同時操作同一個共用資源,所以造成了資源破壞,怎麼辦呢?(join會造成串列,失去所線程的意義)
我們可以通過同步鎖來解決這種問題
R=threading.Lock()
####
def sub():
global num
R.acquire()
temp=num-1
time.sleep(0.1)
num=temp
R.release()
五、遞歸鎖
線上程間共用多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所以這兩個線程在無外力作用下將一直等待下去。死鎖的例子
import threading,time
class myThread(threading.Thread):
def doA(self):
lockA.acquire()
print(self.name,"gotlockA",time.ctime())
time.sleep(3)
lockB.acquire()
print(self.name,"gotlockB",time.ctime())
lockB.release()
lockA.release()
def doB(self):
lockB.acquire()
print(self.name,"gotlockB",time.ctime())
time.sleep(2)
lockA.acquire()
print(self.name,"gotlockA",time.ctime())
lockA.release()
lockB.release()
def run(self):
self.doA()
self.doB()
if __name__=="__main__":
lockA=threading.Lock()
lockB=threading.Lock()
threads=[]
for i in range(5):
threads.append(myThread())
for t in threads:
t.start()
for t in threads:
t.join()#等待線程結束,後面再講。
解決辦法:使用遞歸鎖,將
lockA=threading.Lock()
lockB=threading.Lock()<br>#--------------<br>lock=threading.RLock()
為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
應用
import time
import threading
class Account:
def __init__(self, _id, balance):
self.id = _id
self.balance = balance
self.lock = threading.RLock()
def withdraw(self, amount):
with self.lock:
self.balance -= amount
def deposit(self, amount):
with self.lock:
self.balance += amount
def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景
with self.lock:
interest=0.05
count=amount+amount*interest
self.withdraw(count)
def transfer(_from, to, amount):
#鎖不可以加在這裡 因為其他的其它線程執行的其它方法在不加鎖的情況下數據同樣是不安全的
_from.withdraw(amount)
to.deposit(amount)
alex = Account('alex',1000)
yuan = Account('yuan',1000)
t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
t1.start()
t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
t2.start()
t1.join()
t2.join()
print('>>>',alex.balance)
print('>>>',yuan.balance)
六、同步條件(Event)
import threading,time
class Boss(threading.Thread):
def run(self):
print("BOSS:今晚大家都要加班到23:00。")
print(event.isSet())
event.set()
time.sleep(5)
print("BOSS:<23:00>可以下班了。")
print(event.isSet())
event.set()
class Worker(threading.Thread):
def run(self):
event.wait()
print("Worker:哎……苦逼!")
time.sleep(1)
event.clear()
event.wait()
print("Worker:OhYeah!")
if __name__=="__main__":
event=threading.Event()
threads=[]
for i in range(5):
threads.append(Worker())
threads.append(Boss())
for t in threads:
t.start()
for t in threads:
t.join()
七、信號量(Semaphore)
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似於停車位的概念)
BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。
import threading,time
class myThread(threading.Thread):
def run(self):
if semaphore.acquire():
print(self.name)
time.sleep(3)
semaphore.release()
if __name__=="__main__":
semaphore=threading.Semaphore(5)
thrs=[]
for i in range(100):
thrs.append(myThread())
for t in thrs:
t.start()
八、多線程利器-----隊列(queue)
1、列表是不安全的數據結構
import threading,time
li=[1,2,3,4,5]
def pri():
while li:
a=li[-1]
print(a)
time.sleep(1)
try:
li.remove(a)
except Exception as e:
print('----',a,e)
t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()
思考:如何通過對列來完成上述功能?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
2、queue隊列類的方法
創建一個“隊列”對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。
將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,預設為
1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。
將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,預設為True。如果隊列為空且block為True,
get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize)
2、LIFO類似於堆,即先進後出。 class queue.LifoQueue(maxsize)
3、還有一種是優先順序隊列級別越低越先出來。 class queue.PriorityQueue(maxsize)
此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味著等到隊列為空,再執行別的操作
3、other mode:
import queue
#先進後出
q=queue.LifoQueue()
q.put(34)
q.put(56)
q.put(12)
#優先順序
# q=queue.PriorityQueue()
# q.put([5,100])
# q.put([7,200])
# q.put([3,"zhurui"])
# q.put([4,{"name":"simon"}])
while 1:
data=q.get()
print(data)
生產者消費者模型:
為什麼要使用生產者和消費者模式
線上程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為瞭解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前臺,而客戶去飯菜也不需要不找廚師,直接去前臺領取即可,這也是一個結耦的過程。
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <10:
print("making........")
time.sleep(random.randrange(3))
q.put(count)
print('Producer %s has produced %s baozi..' %(name, count))
count +=1
#q.task_done()
#q.join()
print("ok......")
def Consumer(name):
count = 0
while count <10:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
#q.task_done()
#q.join()
print(data)
print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
九、多進程模塊 multiprocessing
由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。
multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程式內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境
1、進程的調用
調用方式1
from multiprocessing import Process
import time
def f(name):
time.sleep(1)
print('hello', name,time.ctime())
if __name__ == '__main__':
p_list=[]
for i in range(3):
p = Process(target=f, args=('alvin',))
p_list.append(p)
p.start()
for i in p_list:
p.join()
print('end')
調用方式2
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
#self.name = name
def run(self):
time.sleep(1)
print ('hello', self.name,time.ctime())
if __name__ == '__main__':
p_list=[]
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('end')
例子3:
from multiprocessing import Process
import os
import time
def info(title):
print("title:",title)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main process line')
time.sleep(1)
print("------------------")
p = Process(target=info, args=('yuan',))
p.start()
p.join()