python day 20: 線程池與協程,多進程TCP伺服器

来源:https://www.cnblogs.com/lanxing0422/archive/2019/11/01/pythonday20.html
-Advertisement-
Play Games

python day 20: 線程池與協程 2019/11/1 [TOC] 資料來自老男孩教育 2. 線程 線程適用於IO密集流操作,線程是最小的執行單元 線程之間數據是共用的,共用一塊記憶體 import threading :導入線程模塊 t = threading.Thread(target=f ...


目錄

python day 20: 線程池與協程

2019/11/1

資料來自老男孩教育

2. 線程

線程適用於IO密集流操作,線程是最小的執行單元
線程之間數據是共用的,共用一塊記憶體
import threading :導入線程模塊
t = threading.Thread(target=func,args=())
t.start()
t.join([timeout]),此處join是wait的意思,即主線程等待子線程結束,最多等2秒
主進程是否等待子線程
t.setDaemon(True/False)
線程鎖:使用RLOCK
mutex = threading.RLock()創建鎖
mutex.acquire()獲得鎖
mutex.release()釋放鎖
Event事件
event_obj = threading.Event()創建事件對象
event_obj.wait()等待狀態,flag為False則阻塞所有的線程
event_obj.clear()將flag設置為False
event_obj.set()將flal設置為True

3. 進程

創建進程
import multiprocessing
if name=="main":
p = multiprocessing.Process(target=func,args=(),)
p.deamon = True/False(主進程是否等待子進程結束)
p.start()
p.join([timeout])主進程等待子進程結束,最多等待多少秒
進程之間數據不共用,每個進程使用獨立記憶體
進程間數據共用的實現方式Manage或Array
Array
數組必須一開始就定義好數組的長度
數組的元素必須是統一的數據類型
Manage()
manage = Manage()
manage.dict()無長度限制,比Array方便,進程間數據通信使用manage.dict是最優選擇。
進程池Pool
pool = multiprocessing.Pool(5),創建一個最多支持5個進程的進程池。
pool.apply(func,(1,))申請一個進程執行某個函數。每個任務是排隊執行。每個進程有join執行。
pool.apply_async(func=Foo,args=(1,),callback=Bar),申請一個進程去執行Foo方法,將Foo方法的返回值作為實參賦值給Bar函數。每個任務是併發執行。更多是使用apply_async。每個進程沒有join執行. 進程的deamon=True。
pool.close()關閉進程池,不再接收新請求。或pool.terminate()
pool.join(),進程池中進程執行完畢後主進程再關閉。

4. 協程:gevent模塊,又叫微線程

必須安裝greenlet模塊與gevent模塊
gevent代表高性能,當發IO請求時,尤其是網路IO請求時,使用協程
gevent.sleep(0)
gevent.joinall()

from gevent import monkey; monkey.patch_all()
import requests
import gevent

def f(url):
    print("url: %s"%url)
    ret = requests.get(url)
    data = ret.text
    print(url,len(data))

gevent.joinall([
    gevent.spawn(f,"https://juejin.im/post/5aa7314e6fb9a028d936d2a4"),
    gevent.spawn(f,"https://www.cnblogs.com/lanxing0422/p/pythonday19.html"),
    gevent.spawn(f,"http://www.baidu.com"),
])



# # 用於IO密集流
# def foo():
#     print("foo")
#     # 執行gevent的sleep方法
#     gevent.sleep(0)
#     print("foo again")
#
# def bar():
#     print("bar")
#     gevent.sleep(0)
#     print("bar again")
#
# gevent.joinall([
#     gevent.spawn(foo),
#     gevent.spawn(bar),
# ])

5. 擴展

生產者消費者模型
隊列:Queue
隊列的特性:先進先出
q = queue.Queue(max)
q.put()
q.get()
q.get_nowait
q.put_nowait()
rabbit_m_queue開源的,特別牛逼的

6. 自定義線程池

python內部沒有提供,需要自定義。
實際使用中,更多是基於線程池來使用,而不是創建一個單獨的線程。
非常重要,以後會經常使用。

import threading
import queue
import time
import contextlib

Stopevent = object()


class ThreadPool(object):
    '''
    進程池
    '''

    def __init__(self, max_num):
        self.max_num = max_num
        self.q = queue.Queue()
        self.terminate_flag = False
        # 真實創建的線程列表
        self.generate_list = []
        # 空閑中的線程列表
        self.free_list = []

    def generate_thread(self):
        '''
        創建一個線程執行任務
        :return:
        '''
        t = threading.Thread(target=self.call)
        t.start()

    def run(self, func, args, callback=None):
        '''
        線程池執行一個任務
        :param func: 任務函數名
        :param args: 任務函數所需參數元組
        :param callback: 任務執行失敗或成功後執行的回調函數
        :return: 如果線程池已經終止,則返回True否則None
        '''

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        tuple_obj = (func, args, callback)
        self.q.put(tuple_obj)

    def call(self):
        '''
        迴圈獲取任務並執行
        :return:
        '''
        # 獲取當前線程並添加到列表中
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        # 從隊列中取任務
        event = self.q.get()
        # 當該任務的類型不是Stopevent時,死迴圈
        while event != Stopevent:
            func, args, callback = event
            status = True
            try:
                ret = func(*args)
            except Exception as e:
                status = False
                ret = e

            # 當回調函數不為空時,就執行回調函數
            if callback:
                try:
                    callback(status, ret)
                except Exception as e:
                    pass


            # self.free_list.append(current_thread)
            # event = self.q.get()
            # self.free_list.remove(current_thread)
            with self.work_state(self.free_list,current_thread):
                if not self.terminate_flag:
                    event = self.q.get()
                else:
                    event = Stopevent

        else:
            self.generate_list.remove(current_thread)

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put(Stopevent)
            num -= 1

    def terminate(self):
        self.terminate_flag = True
        # 終止線程,不清空隊列
        # max = len(self.generate_list)
        # while max:
        #     self.q.put(Stopevent)
        #     max -= 1
        # self.q.empty()
        # 終止線程且清空隊列
        while self.generate_list:
            self.q.put(Stopevent)
        self.q.empty()


    @contextlib.contextmanager
    def work_state(self,state_list,work_thread):
        '''用於記錄線程池中正在等待的線程數'''
        state_list.append(work_thread)
        try:
            yield
        finally:
            state_list.remove(work_thread)

def foo(i):
    time.sleep(0.1)
    print("當前i是>>>",i)
    return i + 100


def call_back(status, ret):
    print("ret:>>>", ret)


pool = ThreadPool(10)

for i in range(50):
    '''
    將任務放進隊列中:
        創建線程: 
            只有空閑線程列表為空且創建的線程數量小於最大線程數時才會創建線程。
        從隊列中取到任務,線程執行任務
    '''
    pool.run(func=foo, args=(i,), callback=call_back)
pool.close()
print("gene_list",len(pool.generate_list))

7. 實現多進程TCP伺服器

serSocket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
重新設置套接字選項,重覆使用綁定的信息
當有一個有相同本地地址和埠的socket1處於TIME_WAIT狀態時,而你啟動的程式的socket2要占用該地址和埠,你的程式就要用到SO_REUSEADDR選項。

from socket import *
from multiprocessing import *
# 處理客戶端的請求併為其服務
def dealwithClient(newSocket):
    while True:
        recvData = newSocket.recv(1024)
        if len(recvData)==0:
            break
        else:
            print(recvData)
    newSocket.close()


def main():
    serSock = socket(AF_INET,SOCK_STREAM)
    # 設置套接字選項,使其可以重覆使用綁定的信息
    serSock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    bindAddr= ('',9091)
    serSock.bind(bindAddr)
    serSock.listen(5)

    try:
        while True:
            newSocket,clientAddr = serSock.accept()
            p1 = Process(target= dealwithClient,args=(newSocket,))
            p1.start()
            # 因為已經向⼦進程中copy了⼀份(引⽤) ,
            # 並且⽗進程中這個套接字也沒有用處了
            # 所以關閉
            newSocket.close()
    finally:
        serSock.close()

if __name__=='__main__':
    main()

8. 實現多線程TCP伺服器

from socket import *
from threading import *
# 處理客戶端的請求併為其服務
def dealwithClient(newSocket):
    while True:
        recvData = newSocket.recv(1024)
        if len(recvData)==0:
            break
        else:
            print(recvData)
    newSocket.close()


def main():
    serSock = socket(AF_INET,SOCK_STREAM)
    # 設置套接字選項,使其可以重覆使用綁定的信息
    serSock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    bindAddr= ('',9091)
    serSock.bind(bindAddr)
    serSock.listen(5)

    try:
        while True:
            newSocket,clientAddr = serSock.accept()
            p1 = Thread(target= dealwithClient,args=(newSocket,))
            p1.start()
            # 因為線程中共用這個套接字, 如果關閉了會導致這個套接字不可⽤,
            # 但是此時線上程中這個套接字可能還在收數據, 因此不能關閉
            # newSocket.close()
    finally:
        serSock.close()

if __name__=='__main__':
    main()

9. 協程greenlet和gevent

⽐線程更⼩的執⾏單元(微線程)
⼀個線程作為⼀個容器⾥⾯可以放置多個協程

只切換函數調用即可實現多任務,可以減少CPU的切換
協程⾃⼰主動讓出CPU

使用生成器,只切換函數調用即可完成多任務切換

import time
def A():
    while True:
        print(“----A---”)
            yield  
        time.sleep(0.5)
def B(c):
    while True:
        print(“----B---”)
        c.next()
        time.sleep(0.5)
if __name__==‘__main__’:
    a = A() # 如果一個函數中有yield,返回值就是一個生成器
    B(a)
# python中的greenlet模塊對協程進行了封裝(底層相當於yield)
# 安裝模塊: pip3 install greenlet
from greenlet import greenlet
import time
def t1():
    while True:
        print("........A........")
        gr2.switch()
        time.sleep(1)
def t2():
    while True:
        print("........b........")
        gr1.switch()#調到上次執行的地方繼續執行
        time.sleep(1)
gr1 = greenlet(t1)#創建一個greenlet對象
gr2 = greenlet(t2)
gr1.switch()#此時會執行1函數

python還有⼀個⽐greenlet更強⼤的並且能夠⾃動切換任務的模塊 gevent.
原理是當⼀個greenlet遇到IO(指的是input output 輸⼊輸出)操作時, ⽐如訪問⽹絡, 就⾃動切換到其他的greenlet, 等到IO操作完成, 再在適當的時候切換回來繼續執⾏.
io密集型和cpu密集型:一些進程絕大多數時間在計算上,稱為計算密集型(CPU密集型),此時用多進程.
有一些進程則在input 和output上花費了大多時間,稱為I/O密集型。比如搜索引擎大多時間是在等待(耗時操作),相應這種就屬於I/O密集型。此時用多線程.

import gevent
def A():
    while True:
        print(".........A.........")
        gevent.sleep(1)#用來模擬一個耗時操作
        #gevent中:當一個協程遇到耗時操作會自動交出控制權給其他協程
def B():
    while True:
        print(".........B.........")
        gevent.sleep(1)#每當遇到耗時操作,會自用轉到其他協程
g1 = gevent.spawn(A) # 創建一個gevent對象(創建了一個協程),此時就已經開始執行A
g2 = gevent.spawn(B)
g1.join()  #等待協程執行結束
g2.join()  #會等待協程運行結束後再退出

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • [TOC] 1、資料庫事務基礎概念   資料庫事務是對資料庫一次一系列的操作組成的單元,可以包含增刪改查或者只有單個操作。資料庫事務具有四大特性(ACID),ACID是資料庫事務正確執行的四個基本要素的縮寫。分別指:原子性(Atomicity)、一致性(Consistency)、隔離性(I ...
  • 匿名函數 lambda表達式 過濾器 filter(判斷函數,可迭代對象) 會根據提供的函數對指定序列做過濾 映射 map(判斷函數,可迭代對象) 會根據提供的函數對指定序列做映射 ...
  • 簡介 即達到了靜態編譯語言的安全和性能,又達到了動態語言開發維護的高效率 既有c靜態語言的運行速度,又達到了Python動態語言的快速開發 誕生原因 電腦硬體更新頻繁,性能提升很快,主流的編程語言明顯低於硬體的發展, 不能合理利用多核CPU的優勢提升系統性能 軟體的複雜度日益越來越高,維護成本越來 ...
  • int [][]a = new int [5][2]; //定義一個二維數組,其中所包含的一維數組具有兩個元素 對於一個已定義的二位數組a經行如下規則排序,首先按照每一個對應的一維數組第一個元素進行升序排序(即a[][0]),若第一個元素相等,則按照第二個元素進行升序排序(a[][1])。(特別註意 ...
  • 存在類型 形式: 或 主要為了相容 Java 的通配符 示例 scala Array[_] // 等價於 Array[T] forSome { type T} Map[_, _] // 等價於 Map[T, U] forSome { type T; type U T`| |註解| | |參數類型| ...
  • 簡單的 JDBC 操作主要有: JdbcTemplate query queryForObject queryForList update execute 簡單使用如下所示。 初始化資料庫 springboot 會自動執行 resources 文件夾下的 data.sql 和 schema.sql。 ...
  • python基礎 一.python介紹 python的創始人為吉多·範羅蘇姆(Guido van Rossum)。1989年的聖誕節期間,吉多·範羅蘇姆為了在阿姆斯特丹打發時間,決心開發一個新的腳本解釋程式,作為ABC語言的一種繼承。 最新的TIOBE排行榜,Python已經占據世界第四名的位置, ...
  • 一、取整處理 1.int() 向下取整 內置函數 2.round() 四捨五入 內置函數 3. floor() 向下取整 math模塊函數 floor的英文釋義:地板。顧名思義也是向下取整 4.ceil()向上取整 math模塊函數 ceil的英文釋義:天花板。 5.modf() 分別取整數部分和小 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...