大家好,併發編程 進入第十一章。 前面兩節,我們講了協程中的單任務和多任務 這節我們將通過一個小實戰,來對這些內容進行鞏固。 在實戰中,將會用到以下知識點: 多線程的基本使用 Queue消息隊列的使用 Redis的基本使用 asyncio的使用 . 動態添加協程 在實戰之前,我們要先瞭解下在asyn ...
大家好,併發編程
進入第十一章。
前面兩節,我們講了協程中的單任務
和多任務
這節我們將通過一個小實戰,來對這些內容進行鞏固。
在實戰中,將會用到以下知識點:
- 多線程的基本使用
- Queue消息隊列的使用
- Redis的基本使用
- asyncio的使用
. 動態添加協程
在實戰之前,我們要先瞭解下在asyncio中如何將協程態添加到事件迴圈中的。這是前提。
如何實現呢,有兩種方法:
- 主線程是
同步
的
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一個在後臺永遠運行的事件迴圈
asyncio.set_event_loop(loop)
loop.run_forever()
def do_sleep(x, queue, msg=""):
time.sleep(x)
queue.put(msg)
queue = Queue()
new_loop = asyncio.new_event_loop()
# 定義一個線程,並傳入一個事件迴圈對象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(time.ctime())
# 動態添加兩個協程
# 這種方法,在主線程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一個")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二個")
while True:
msg = queue.get()
print("{} 協程運行完..".format(msg))
print(time.ctime())
由於是同步的,所以總共耗時6+3=9
秒.
輸出結果
Thu May 31 22:11:16 2018
第一個 協程運行完..
Thu May 31 22:11:22 2018
第二個 協程運行完..
Thu May 31 22:11:25 2018
- 主線程是
非同步
的,這是重點,一定要掌握。。
import time
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一個在後臺永遠運行的事件迴圈
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg)
queue = Queue()
new_loop = asyncio.new_event_loop()
# 定義一個線程,並傳入一個事件迴圈對象
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(time.ctime())
# 動態添加兩個協程
# 這種方法,在主線程是非同步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一個"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二個"), new_loop)
while True:
msg = queue.get()
print("{} 協程運行完..".format(msg))
print(time.ctime())
輸出結果
由於是同步的,所以總共耗時max(6, 3)=6
秒
Thu May 31 22:23:35 2018
第二個 協程運行完..
Thu May 31 22:23:38 2018
第一個 協程運行完..
Thu May 31 22:23:41 2018
. 實戰:利用redis實現動態添加任務
對於併發任務,通常是用生成消費模型,對隊列的處理可以使用類似master-worker的方式,master主要用戶獲取隊列的msg,worker用戶處理消息。
為了簡單起見,並且協程更適合單線程的方式,我們的主線程用來監聽隊列,子線程用於處理隊列。這裡使用redis的隊列。主線程中有一個是無限迴圈,用戶消費隊列。
先安裝Redis
到 https://github.com/MicrosoftArchive/redis/releases 下載
解壓到你的路徑。
然後,在當前路徑運行cmd,運行redis的服務端。
服務開啟後,我們就可以運行我們的客戶端了。
並依次輸入key=queue,value=5,3,1的消息。
一切準備就緒之後,我們就可以運行我們的代碼了。
import time
import redis
import asyncio
from queue import Queue
from threading import Thread
def start_loop(loop):
# 一個在後臺永遠運行的事件迴圈
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put("ok")
def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
return redis.Redis(connection_pool=connection_pool)
def consumer():
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)
if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop()
# 定義一個線程,運行一個事件迴圈對象,用於實時接收新任務
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
# 創建redis連接
rcon = get_redis()
queue = Queue()
# 子線程:用於消費隊列消息,並實時往事件對象容器中添加新任務
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()
while True:
msg = queue.get()
print("協程運行完..")
print("當前時間:", time.ctime())
稍微講下代碼
loop_thread
:單獨的線程,運行著一個事件對象容器,用於實時接收新任務。consumer_thread
:單獨的線程,實時接收來自Redis的消息隊列,並實時往事件對象容器中添加新任務。
輸出結果
Thu May 31 23:42:48 2018
協程運行完..
當前時間: Thu May 31 23:42:49 2018
協程運行完..
當前時間: Thu May 31 23:42:51 2018
協程運行完..
當前時間: Thu May 31 23:42:53 2018
我們在Redis,分別發起了5s
,3s
,1s
的任務。
從結果來看,這三個任務,確實是併發執行的,1s
的任務最先結束,三個任務完成總耗時5s
運行後,程式是一直運行在後臺的,我們每一次在Redis中輸入新值,都會觸發新任務的執行。。
好了,經過這個實戰內容,你應該對asyncio
的實際應用有了一個更深刻的認識了,至此,你已經可以使用asyncio
來實現任務的併發。快去體驗一下。如果有什麼疑問,請在後臺加我微信與我聯繫。。