進程: 1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import multiprocessing,threading,time 5 6 def run(name): 7 t=threading.Thread(ta ...
進程:
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import multiprocessing,threading,time 5 6 def run(name): 7 t=threading.Thread(target=run2)#創建新線程 8 t.start() 9 print('進程[%s],列印中...'%name) 10 time.sleep(1) 11 12 def run2(): 13 print(threading.get_ident())#列印線程ID 14 time.sleep(2) 15 16 17 if __name__ == '__main__': 18 for i in range(10): 19 p=multiprocessing.Process(target=run,args=('第[%s]個進程'%i,))#創建新進程 20 p.start()View Code
進程號:
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 5 import multiprocessing,threading,time,os 6 from multiprocessing import Process#從 multprocessing 打開 Process 7 def info_l(file): 8 print('當前模塊名:',__name__) 9 print('父進程ID:',os.getppid()) 10 print('進程ID:',os.getpid()) 11 print('\n\n') 12 13 def f(name): 14 print('查看:',name) 15 info_l('相關列表') 16 17 18 if __name__ == '__main__': 19 info_l('主進程相關列表') 20 p=Process(target=f,args=('當前進程',)) 21 p.start() 22 p.join()View Code
進程鎖:
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 5 from multiprocessing import Process,Lock # Lock 進程鎖通訊中間件 6 7 def f(lock,i): 8 lock.acquire()#進程鎖 9 print('第[%s]個進程'%i) 10 lock.release()#解鎖 11 if __name__ =='__main__': 12 lock=Lock()#進程鎖對象 13 for i in range(10): 14 p=Process(target=f,args=(lock,i)).start()View Code
進程之間通訊:
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 5 import multiprocessing,queue,threading 6 from multiprocessing import Process,Queue # Queue 進程通訊中間件 7 8 9 ''' 線程 之間共用隊列 10 def f(): 11 q.put([1,None,'加入數據'])#隊列 12 if __name__ =='__main__': 13 q=queue.Queue()#線程隊列 14 #q=Queue()#進程隊列 15 p=threading.Thread(target=f,)#創建線程 16 p.start() 17 print(q.get())#輸出,取出的 18 p.join() 19 ''' 20 def f(q):#存入q對象 21 q.put([1,None,'加入數據'])#隊列 22 if __name__ =='__main__': 23 q=Queue()#進程隊列 24 p=Process(target=f,args=(q,))#創建進程 25 p.start() 26 print(q.get())#輸出,取出的 27 p.join()View Code
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 import os 5 from multiprocessing import Process,Pipe,Manager # Pipe 管道 進程通訊中間件 6 7 8 def f(d,l): 9 d[os.getpid()]=os.getpid()#修改字典 10 l.append(os.getpid())#添加列表內容 11 print(d) 12 print(l) 13 14 15 if __name__ =='__main__': 16 with Manager() as manager: 17 d=manager.dict()#創建一個進程之間可修改的字典 18 l=manager.list(range(5))#創建一個進程之間可修改的列表 19 p_list=[]#join使用 20 for i in range(10): 21 p=Process(target=f,args=(d,l))#創建進程傳入數據, 22 p.start() 23 p_list.append(p) 24 for r in p_list:#等待進程完成 25 r.join() 26 print(d) 27 print(l)View Code
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 5 from multiprocessing import Process,Pipe # Pipe 管道 進程通訊中間件 6 7 def f(conn):#存入conn對象 8 conn.send(['子進程發送信息','....']) 9 print('收到父進程的信息:',conn.recv()) 10 conn.close() 11 if __name__ =='__main__': 12 parent_conn,child_conn=Pipe()#生成一個管道,返回兩個值,管理雙端 13 p=Process(target=f,args=(child_conn,))#創建進程 14 p.start() 15 print('收到子進程的信息:',parent_conn.recv()) 16 parent_conn.send(['父進程發送信息']) 17 p.join()View Code
進程池:
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #__author__=2017/6/23 5 import time,os 6 from multiprocessing import Process,Lock,Pool # Pool 進程池通訊中間件 7 8 def Foo(i): 9 print('第[%s]個進程,ID:'%i,os.getpid()) 10 time.sleep(3) 11 return i+100 12 def Bar(arg): 13 print('回調>>>>:',arg,os.getpid()) 14 if __name__ =='__main__': 15 #pool=Pool(processes=5)#定義一個進程池 表示允許進程池同時放入5個進程 16 pool=Pool(5)#定義一個進程池 表示允許進程池同時放入5個進程 17 for i in range(10): 18 #pool.apply(func=Foo,args=(i,))#使用進程池創建進程 串列 19 #pool.apply_async(func=Foo,args=(i,))#使用進程池創建進程 並行 20 pool.apply_async(func=Foo,args=(i,),callback=Bar)#回調 21 22 print('結束') 23 #pool.join() 24 pool.close()#一定要先關閉進程池 25 pool.join()#後進行joinView Code
協程:
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 10:10 6 #__author__='Administrator' 7 8 import time 9 import queue 10 def consumer(name):#消費者函數 11 print('[%s]消費產品中.......'%name) 12 while True: 13 new_b=yield #跳轉點 14 print('[%s] 消費 [%s]'%(name,new_b)) 15 def producer():#生產者函數 16 r=con.__next__() 17 r2=con2.__next__() 18 n=0 19 while n<10: 20 n+=1 21 con.send(n)#發送給消費者 22 con2.send(n) 23 print('\033[32;1m[生產者]\033[0m生產產品[%s]'%n) 24 25 if __name__=='__main__': 26 con=consumer('消費者A') 27 con2=consumer('消費者B') 28 p=producer()View Code
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 10:31 6 #__author__='Administrator' 7 #import greenlet 8 from greenlet import greenlet 9 10 def test1(): 11 print('函數一: 12') 12 ger2.switch()#進行協程切換 13 print('函數一: 34') 14 ger2.switch() 15 16 def test2(): 17 print('函數二: 56') 18 ger1.switch() 19 print('函數二: 78') 20 ger1.switch() 21 22 ger1=greenlet(test1)#創建協程 23 ger2=greenlet(test2) 24 ger1.switch()View Code
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 10:47 6 #__author__='Administrator' 7 8 import gevent 9 def func1(): 10 print('func1 reading....') 11 gevent.sleep(2) 12 print('func1 reading two.. end') 13 14 def func2(): 15 print('func2 reading....') 16 gevent.sleep(0) 17 print('func2 reading two.. end') 18 def func3(): 19 print('func3 reading....') 20 gevent.sleep(1) 21 print('func3 reading two.. end') 22 23 gevent.joinall([ 24 gevent.spawn(func1), 25 gevent.spawn(func2), 26 gevent.spawn(func3) 27 ])View Code
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 14:03 6 #__author__='Administrator' 7 from urllib import request 8 import gevent,time 9 from gevent import monkey 10 monkey.patch_all()#對所有的I/O操作進行標記 11 12 def f(url):#爬取網頁 13 print('網址: ',url) 14 resp=request.urlopen(url)#打開網頁 15 data=resp.read()#讀取網頁 16 print('網址:[%s]的網頁數據大小:[%s]'%(url,len(data))) 17 18 urls=['https://www.python.org/', 19 'https://hao.360.cn/', 20 'https://www.yahoo.com/'] 21 22 time_start=time.time()#同步 串列開始時間 23 for url in urls: 24 f(url) 25 print('同步時長:',time.time()-time_start) 26 27 time_start_asy=time.time()#非同步 並行開始時間 28 gevent.joinall([ 29 gevent.spawn(f,'https://www.python.org/'), 30 gevent.spawn(f,'https://hao.360.cn/'), 31 gevent.spawn(f,'https://www.yahoo.com/') 32 ]) 33 print('異域步時長:',time.time()-time_start_asy)View Code
協程socket_server 實現併發
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 14:42 6 #__author__='Administrator' 7 8 import sys 9 import socket 10 import time 11 import gevent 12 13 from gevent import socket,monkey 14 monkey.patch_all() 15 16 17 def server(port): 18 s = socket.socket()#socket 對象 19 s.bind(('0.0.0.0', port))#服務端,bind IP 埠 20 s.listen(500) 21 print('監聽中....') 22 while True: 23 cli, addr = s.accept() 24 gevent.spawn(handle_request, cli)#創建一個新協程來 25 26 27 28 def handle_request(conn): 29 try: 30 while True: 31 data = conn.recv(1024) 32 print("recv:", data) 33 conn.send(data) 34 if not data: 35 conn.shutdown(socket.SHUT_WR) 36 37 except Exception as ex: 38 print(ex) 39 finally: 40 conn.close() 41 if __name__ == '__main__': 42 server(8001)View Code
select :
socket_server
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 19:34 6 #__author__='Administrator' 7 8 import select,socket,sys ,queue 9 10 s=socket.socket()#實例化一個連接對象 11 s.setblocking(0)#設置成非阻塞 12 server_addr=('localhost',9500)#設置綁定的 IP 埠 13 s.bind(server_addr)#連接對象綁定IP 埠 14 s.listen(100)#隊列 可連接數量 15 inputs=[s,]#首先要監測本身 16 17 outputs=[]#發送列表 18 19 meg_queues={} #發送 連接對象的隊列集合 字典 20 21 while True: 22 print('監聽中......') 23 readable,writeable,exeptional=select.select(inputs,outputs,inputs)#生成select 對象,返回三個列表 連接,發關,錯誤 24 25 for i in readable: #i為一個socket 26 if i is s:#如果i 是s 表示有新 連接 進來 27 conn,client_addr=i.accept()#建立一個新連接 28 print('接入一個新連接...',client_addr) 29 conn.setblocking(0)#也設成非阻塞 30 inputs.append(conn)#加入select,的連接列表,避免出現阻塞 31 meg_queues[conn]=queue.Queue()#創建一個隊列 添加到字典 32 else: 33 try: 34 data=i.recv(1024)#如果不是新連接就收數據 35 except Exception as e: 36 print(e) 37 if data: #如果數據不為空 38 print('[%s] 發來的數據 [%s]'%(i.getpeername,data)) 39 meg_queues[i].put(data)#當前連接的消息隊列加入數據 40 if i not in outputs:#如果當前連接沒有在發送列表內,就加入發送列表 41 outputs.append(i) 42 else: 43 print('客戶端已經斷開了....')#開始清理工作 44 if i in outputs:#在發送列表 45 outputs.remove(i)#在發送列表內刪除 46 inputs.remove(i)#在連接列表內刪除 47 del meg_queues[i]#在隊列字典內刪除 48 49 for w in writeable:#迴圈發送列表 50 try: 51 msg=meg_queues[w].get_nowait()#取出隊列中的數據,判斷 52 except queue.Empty:#如果數據為空 53 outputs.remove(w)##從發送列表內刪除 54 else: 55 w.send(msg)#發送 56 57 for e in exeptional:#迴圈錯誤列表 58 print('連接[%s]出錯!'%e.getpeername) 59 inputs.remove(e)##從發送列表內刪除 60 if e in outputs:#在發送列表 61 outputs.remove(e)#在發送列表內刪除 62 e.close() 63 del meg_queues[e]#在隊列字典內刪除View Code
socket_client
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # Author calmyan 4 #python 5 #2017/6/24 19:23 6 #__author__='Administrator' 7 8 import socket 9 10 server_addr=('localhost',9500)#設置綁定的 IP 埠 11 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 12 s.connect(server_addr) 13 while True: 14 msg = bytes(input(">>:"),encoding="utf8")#定義一個數據 消息 15 if msg: 16 s.sendall(msg)#發送數據 17 else: 18 print('不能為空') 19 continue 20 data = s.recv(1024)#收數據(讀數據) 21 #print(data) 22 23 print('Received', repr(data.decode())) 24 s.close()View Code
selectors :