【文章中代碼未縮進,剛開始使用博客,後期會優化~】 主線程下的線程之間是可以通信的,但是父進程下的子進程之間不能主動通信,但是子進程想要實現通信也是可以的,可以選擇折中的方法來實現,比如multiprocessing.Queue,用法與線程中的queue基本一致,直接上例子:import threa ...
【文章中代碼未縮進,剛開始使用博客,後期會優化~】
主線程下的線程之間是可以通信的,但是父進程下的子進程之間不能主動通信,但是子進程想要實現通信也是可以的,可以選擇折中的方法來實現,比如multiprocessing.Queue,用法與線程中的queue基本一致,直接上例子:
import threading
from multiprocessing import Process,Queue
import time
def thre(qq):
qq.put([1,'xixi',2])
if __name__ =='__main__':
q = Queue()
p = Process(target=thre,args=(q,))#進程中,因為進程記憶體是獨立的,所以不能相互調用,必須傳入參數,這個q其實是複製了一份Queue的實例,如果和線程一樣不傳參數,就會報錯** not find。。。因為記憶體不共用。
#p =threading.Thread(target=thre) #線程中,不傳參數是可以調用函數thre,因為他們是在同一個記憶體地址下操作【上面改為def thre():】,當然傳參數也沒問題。
p.start()
print(q.get())
#進程之間想要有聯繫,主動無法聯繫,這是硬傷,就如qq和word一樣,但是如果非要他們有聯繫,就是從word複製文字到qq里(或者qq複製圖片文字到word裡面這樣),這樣貌似兩者有聯繫,實際上只是克隆了那段文字的關係,但是看起來好像就有聯繫了,那麼python中process之間的通信就是可以考慮通過 Queue來實現,Queue內部操作其實就是通過pickle的功能來實現傳參數等各種聯繫的
還有一個是pipe:通過管道來傳遞,也是建立一個pipe的實例化對象。
from multiprocessing import Process,Pipe
def f(conn):
conn.send('balabala')
print(conn.recv())
if __name__=='__main__':
parent_conn,child_conn=Pipe()#實例化後是返回兩個值,一個是父接頭一個是子接頭,因為是管道。
p = Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())
parent_conn.send('babababa')
這樣也能實現數據的傳遞,但都不是共用
進程之間要實現共用,需要用manager。
from multiprocessing import Process,Manager
import time,os
def thre(dd,ll):
dd[os.getpid()] = os.getppid()
ll.append(os.getpid())
print(ll)
print(dd)
if __name__ =='__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(3))
t_list = []
for i in range(10):
p = Process(target=thre,args=(d,l))
p.start()
t_list.append(p)
for res in t_list:
res.join()
此時字典 d 和 列表 l,他們的數據同時都可以被進程修改覆蓋,只不過我這裡用的是os.getpid()獲取的數據不一致,如果是一致的,那麼最終字典只有一個k-v,列表是10個一樣的數據。
進程鎖的存在是為了輸出同一個屏幕不要亂。。。僅此而已
進程池的作用和線程中的信號量差不多,同一時間允許幾個進程同時運行
其中有 apply 和apply_async,一個是串列操作,一個是並行操作。
from multiprocessing import Pool
import time,os
def thre(dd):
time.sleep(1)
print('the process:',os.getpid())
return dd+100
def g(c):
print('haha',c,os.getpid())
#start_time = time.time()
# l=[]
if __name__ =='__main__':
p_ = Pool(3)#允許同時運行的進程數為3。
print(os.getpid())
for i in range(10):
p_.apply_async(func=thre,args=(i,),callback=g)【callback是回調函數,傳的參數是thre的返回值】
p_.close()
p_.join()#這裡如果不加join,在並行中會直接close,程式會直接關閉,加了join,主進程就會等待子進程結束以後最後才關閉,這個只在並行中有用,串列中沒有什麼作用。一定要先close再join
協程:可以實現高併發,本質上就是單線程,一個cpu支持上萬個協程併發
gevent(自動觸發) 和 greenlet(手動觸發)
import gevent
def fun1():
print('runing 1 ...')
gevent.sleep(2)#模仿io
print('running 2 ...')
def fun2():
print('running 3 ...')
gevent.sleep(3)
print('running 4')
def fun3():
print('running 5 ...')
gevent.sleep(0)
print('end?')
gevent.joinall([gevent.spawn(fun1),gevent.spawn(fun2),gevent.spawn(fun3)])
運行結果:
runing 1 ...
running 3 ...
running 5 ...
end?
running 2 ...
running 4
----------------------
sleep相當於觸發的按鈕,出現一次sleep,就去找下一個函數中的內容列印等操作,sleep內的時間相當於他卡幾次,sleep(3)相當於卡3秒,如果其他已經沒卡著,就馬上執行沒卡著的語句,知道最後回來等到時間結束執行最後這個語句。協程用於多併發爬蟲中效果很好。
import gevent,time
import urllib.request as ul
from gevent import monkey
monkey.patch_all()#這個標識代表把所有程式都當做io直接切換操作,不加這句話,因為gevent不會辨認出urllib的有io操作,相當於串列操作。
def f(url):
print('GET %s'%url)
res = ul.urlopen(url).read()
print('recv bytes %s from %s'%(len(res),url))
time_start = time.time()
l=['https://www.python.org/','http://km.58.com/','http://kan.sogou.com/dongman/','http://news.sohu.com/']
for i in l:
f(i)
print('同步時間:',time.time()-time_start)
async_time = time.time()
gevent.joinall([gevent.spawn(f,'https://www.python.org/'),
gevent.spawn(f,'http://km.58.com/'),
gevent.spawn(f,'http://kan.sogou.com/dongman/'),
gevent.spawn(f,'http://news.sohu.com/')])
print('非同步時間:',time.time()-async_time)
運行結果:
GET https://www.python.org/
recv bytes 48860 from https://www.python.org/
GET http://km.58.com/
recv bytes 104670 from http://km.58.com/
GET http://kan.sogou.com/dongman/
recv bytes 12713 from http://kan.sogou.com/dongman/
GET http://news.sohu.com/
recv bytes 170935 from http://news.sohu.com/
同步時間: 3.780085563659668
GET https://www.python.org/
GET http://km.58.com/
GET http://kan.sogou.com/dongman/
GET http://news.sohu.com/
recv bytes 12690 from http://kan.sogou.com/dongman/
recv bytes 170935 from http://news.sohu.com/
recv bytes 104670 from http://km.58.com/
recv bytes 48860 from https://www.python.org/
非同步時間: 2.5934762954711914
用戶空間和內核空間(kernel)
現在操作系統中都是採用虛擬存儲器,操作系統的核心是內核,獨立於普通的應用程式,可以訪問受保護的記憶體空間,也有訪問硬體設備的許可權,為了保證用戶進程不能直接操作內核(kernel),保證內核的安全,操作系統把虛擬空間分為兩部分,一部分為內核空間,一部分為用戶空間。
進程切換
為了控制進程的執行,內核必須有能力掛起在CPU上運行的進程,並且恢復以前掛起的某個進程的執行,這種行為稱作進程切換,因此,任何進程都是在操作系統內核的支持下運行的,與內核緊密相連。
從一個進程的運行轉到另一個進程上運行,其實就是保存上下文就切換了。下次再來又從之前保存的位置開始。
進程的阻塞:
正式執行的進程,由於期待的某件事情並未發生,如請求系統資源失敗等待,等待某種操作的完成,新數據尚未達到或無新工作開始等,則有系統自動執行阻塞原語,使自己由原來的運行狀態轉為阻塞狀態暫停等待()
。可見,進程的阻塞是進程自身的一種主動行為,也因此只有處於運行狀態的進程(獲得CPU),才可能將其轉為阻塞狀態,當進程進入阻塞狀態時候,不耗費CPU資源的。
緩存I/O
又被成為標準IO,大多數文件系統預設I/O操作都是緩存I/O,在Linux的緩存I/O機制當中,操作系統會將I/O的數據緩存在文件系統的頁緩存中,也就是說,文件數據會被拷貝到系統內核的緩衝區中,然後再從系統內核的緩衝區拷貝到用戶的進程記憶體里也就是應用程式的地址空間。缺點就是數據會在用戶進程應用程式地址空間和內核空間反覆拷貝操作,這時對於CPU和記憶體的開銷很大。
I/O模式
同步IO和非同步IO:
同步IO中有:阻塞IO(blocking I/O),非阻塞IO(non-blocking I/O),多路復用IO(I/O multiplexing) 信號驅動(實際中不常用。在此暫時不記錄筆記)
非同步I/O(asynchronous I/O)
阻塞IO:發起請求,然後等待數據準備(此時進程阻塞等待),直到數據準備好接受時,又到內核空間開始copy給用戶進程,此時又一次阻塞等待,直到數據全部發給用戶進程(客戶端)。
非阻塞IO:發起請求後,瘋狂發送驗證,數據未準備好時,並不會阻塞block,而是返回一個error給用戶進程,用戶進程會驗證是否error,是就繼續發出請求,來回驗證,(此時由於進程沒有阻塞,還可以乾其他事,)不是就到了內核空間開始copy數據,此時其實還是阻塞,如果數據小會很快,數據大還是會感受到卡。最後用戶收到完整數據。
多路復用I/O:一次發起幾百次請求鏈接,無論哪條鏈接有數據回覆,都會通知用戶進程開始接受數據,此時那幾條鏈接又開始進行內核copy直到進程收到完整數據(其實這裡也是阻塞的)。這個模式的核心其實是用非阻塞IO的方式來驅動,所以形成多路復用,在用戶看來已經是多併發了。
非同步I/O:這個就牛逼了,他發起請求,當場就收到回覆‘去乾你其他的事’,此時該進程開始其他部分運行,並未有任何阻塞,收到數據時,直接後臺開始內核copy,全部搞完以後直接‘送快遞到家門口’,給一個信號通知,用戶進程順手就接受了數據,此時整個進程根本沒有任何阻塞過程!這就是非同步IO。
selectors
selectors中涵蓋了select,poll,epoll,詳細實例:
import selectors,socket
sel = selectors.DefaultSelector()
def accept(sock,mask):
conn,addr = sock.accept()
conn.setblocking(False)
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
data = conn.recv(1024).decode()
if data:
conn.send(('haha+%s'%data).encode())
else:
print('發什麼?',conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost',5000))
sock.listen(1000)
sel.register(sock,selectors.EVENT_READ,accept)
while True:
events= sel.select()
for key,mask in events:
callback = key.data
callback(key.fileobj,mask)
可以進行多併發運行。