Python全棧開發之11、進程和線程

来源:http://www.cnblogs.com/Wxtrkbc/archive/2016/06/22/5601236.html
-Advertisement-
Play Games

一、線程 多任務可以由多進程完成,也可以由一個進程內的多線程完成,一個進程內的所有線程,共用同一塊記憶體python中創建線程比較簡單,導入threading模塊,下麵來看一下代碼中如何創建多線程。 主線程從上到下執行,創建5個子線程,列印出'start',然後等待子線程執行完結束,如果想讓線程要一個 ...


一、線程

  多任務可以由多進程完成,也可以由一個進程內的多線程完成,一個進程內的所有線程,共用同一塊記憶體python中創建線程比較簡單,導入threading模塊,下麵來看一下代碼中如何創建多線程。

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.start()
    print('start')          # 主線程等待子線程完成,子線程併發執行


>>start
>>2
>>1
>>3
>>0
>>4

  主線程從上到下執行,創建5個子線程,列印出'start',然後等待子線程執行完結束,如果想讓線程要一個個依次執行完,而不是併發操作,那麼就要使用join方法。下麵來看一下代碼

import threading
import time

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.start()
        t.join()
    print('start')      # 線程從上到下依次執行,最後列印出start

>>0
>>1
>>2
>>3
>>4
>>start

  上面的代碼不適用join的話,主線程會預設等待子線程結束,才會結束,如果不想讓主線程等待子線程的話,可以子線程啟動之前設置將其設置為後臺線程,如果是後臺線程,主線程執行過程中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均停止,前臺線程則相反,若果不加指定的話,預設為前臺線程,下麵從代碼來看一下,如何設置為後臺線程。例如下麵的例子,主線程直接列印start,執行完後就結束,而不會去等待子線程,子線程中的數據也就不會列印出來

import threading
import time

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.setDaemon(True)
        t.start()

    print('start')      # 主線程不等待子線程

>> start 

  除此之外,自己還可以為線程自定義名字,通過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此之外,Thread還有一下一些方法

  • t.getName() : 獲取線程的名稱
  • t.setName() : 設置線程的名稱 
  • t.name : 獲取或設置線程的名稱
  • t.is_alive() : 判斷線程是否為激活狀態
  • t.isAlive() :判斷線程是否為激活狀態
  • t.isDaemon() : 判斷是否為守護線程

二、線程鎖

  由於線程是共用同一份記憶體的,所以如果操作同一份數據,很容易造成衝突,這時候就可以為線程加上一個鎖了,這裡我們使用Rlock,而不使用Lock,因為Lock如果多次獲取鎖的時候會出錯,而RLock允許在同一線程中被多次acquire,但是需要用n次的release才能真正釋放所占用的瑣,一個線程獲取了鎖在釋放之前,其他線程只有等待。 

import threading
G = 1
lock = threading.RLock()
def fun():
    lock.acquire()    # 獲取鎖
    global G
    G += 2
    print(G, threading.current_thread().name)
    lock.release()   # 釋放鎖
    return


for i in range(10):
    t = threading.Thread(target=fun, name='t-{}'.format(i))
    t.start()

3 t-0
5 t-1
7 t-2
9 t-3
11 t-4
13 t-5
15 t-6
17 t-7
19 t-8
21 t-9    

三、線程間通信Event

Event是線程間通信最間的機制之一,主要用於主線程式控制制其他線程的執行,主要用過wait,clear,set,這三個方法來實現的的,下麵來看一個簡單的例子,

import threading
import time

def f1(event):
    print('start:')
    event.wait()            # 阻塞在,等待 set
    print('end:')

if __name__ == '__main__':
    event_obj  = threading.Event()
    for i in range(5):
        t = threading.Thread(target=f1, args=(event_obj,))
        t.start()

    event_obj.clear()      # 清除標誌位 
    inp = input('>>>>:')
    if inp == 'true':
        event_obj.set()   # 設置標誌位

四、隊列  

  可以簡單的理解為一種先進先出的數據結構,比如用於生產者消費者模型,或者用於寫線程池,以及前面寫select的時候,讀寫分離時候可用隊列存儲數據等等,以後用到隊列的地方很多,因此對於隊列的用法要熟練掌握。下麵首先來看一下隊列提供了哪些用法

q = queue.Queue(maxsize=0)  # 構造一個先進顯出隊列,maxsize指定隊列長度,為0時,表示隊列長度無限制。

q.join()        # 等到隊列為kong的時候,在執行別的操作
q.qsize()       # 返回隊列的大小 (不可靠)
q.empty()       # 當隊列為空的時候,返回True 否則返回False (不可靠)
q.full()        # 當隊列滿的時候,返回True,否則返回False (不可靠)
q.put(item, block=True, timeout=None)   # 將item放入Queue尾部,item必須存在,參數block預設為True,表示當隊列滿時,會等待
                        # 為False時為非阻塞,此時如果隊列已滿,會引發queue.Full 異常。 可選參數timeout,表示會阻塞設置的時間,
                        # 如果在阻塞時間里 隊列還是無法放入,則引發 queue.Full 異常

q.get(block=True, timeout=None)     #  移除並返回隊列頭部的一個值,可選參數block預設為True,表示獲取值的時候,如果隊列為空,則阻塞
                       #  阻塞的話若此時隊列為空,則引發queue.Empty異常。 可選參數timeout,表示會阻塞設置的時間,
q.get_nowait()               #  等效於 get(item,block=False) 

下麵用代碼來簡單的演示下,消費者生成者模型,只是簡單的演示下。

message = queue.Queue(10)

def product(num):
    for i in range(num):
        message.put(i)
        print('將{}添加到隊列中'.format(i))
        time.sleep(random.randrange(0, 1))


def consume(num):
    count = 0
    while count<num:
        i = message.get()
        print('將{}從隊列取出'.format(i))
        time.sleep(random.randrange(1, 2))
        count += 1


t1 = threading.Thread(target=product, args=(10, ))
t1.start()

t2 = threading.Thread(target=consume, args=(10, ))
t2.start()

五、進程 

  線程的上一級就是進程,進程可包含很多線程,進程和線程的區別是進程間的數據不共用,多進程也可以用來處理多任務,不過多進程很消耗資源,計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核心說保持一致。

在windows中不能用fork來創建多進程,因此只能導入multiprocessing,來模擬多進程,下麵首先來看一下怎麼創建進程,大家可以先猜一下下麵的結果是什麼

l = []

def f(i):
    l.append(i)
    print('hi', l)

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=f, args=(i,))        # 數據不共用,創建10份 l列表
        p.start()

六、進程間數據共用  

進程間的數據是不共用的,但是我如果非要數據共用了,那麼就需要用其他方式了 

1、Value,Array

def f(a, b):
    a.value = 3.111
    for i in range(len(b)):
        b[i] += 100

if __name__ == '__main__':
    num = Value('f', 3.333)        # 類似C語言中的 浮點型數
    l = Array('i', range(10))       # 類似C語言中的整形數組,長度為10
    print(num.value)
    print(l[:])

    p = Process(target=f, args=(num, l))
    p.start()
    p.join()
    print(num.value)              # 大家自己運行一下,看下兩次列印結果是否一樣
    print(l[:])

2、manage  

方式一,使用的都是C語言中的數據結構,如果大家對c不熟悉的話,用起來比較麻煩,方式2就可以支持python自帶的數據,下麵來看一下

from multiprocessing import Process,Manager

def Foo(dic, i):
    dic[i] = 100 + i
    print(dic.values())

if __name__ == '__main__':
    manage = Manager()
    dic = manage.dict()

    for i in range(2):
        p = Process(target=Foo, args=(dic, i))
        p.start()
        p.join()

七、進程池  

  實際應用中,並不是每次執行任務的時候,都去創建多進程,而是維護了一個進程池,每次執行的時候,都去進程池取一個,如果進程池裡面的進程取光了,就會阻塞在那裡,直到進程池中有可用進程為止。首先來看一下進程池提供了哪些方法

  • apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,由於這個原因,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。如果callback被指定,那麼callback可以接收一個參數然後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被立即完成,否則處理結果的線程會被阻塞。

  • close() : 等待任務完成後在停止工作進程,阻止更多的任務提交到pool,待任務完成後,工作進程會退出。

  • terminate() : 不管任務是否完成,立即停止工作進程。在對pool對象進程垃圾回收的時候,會立即調用terminate()。

  • join() : 等待工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait,否則進程會成為僵屍進程。

下麵來簡單的看一下代碼怎麼用的

from multiprocessing import Pool
import time

def f1(i):
    time.sleep(1)
    # print(i)
    return i

def cb(i):
    print(i)

if __name__ == '__main__':
    poo = Pool(5)
    for i in range(20):
        # poo.apply(func=f1, args=(i,))   # 串列執行,排隊執行 有join
        poo.apply_async(func=f1, args=(i,), callback=cb)  # 併發執行 主進程不等子進程,無join
    print('**********')

    poo.close()
    poo.join()

八、線程池 

  對於前面的進程池,python自帶了一個模塊Pool供我們使用,但是對於線程池,則沒有提供,因此需要我們自己寫,自己寫的話,就需要用到隊列,下麵我們來看一下自己怎麼實現一個線程池,首先寫一個最簡單的版本。 

import threading
import time
import queue

class ThreadPool:
    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.add()

    def add(self):
        self.queue.put(threading.Thread)

    def get(self):
        return self.queue.get()

def f(tp, i):
    time.sleep(1)
    print(i)
    tp.add()

p = ThreadPool(10)
for i in range(20):
    thread = p.get()
    t = thread(target=f, args=(p, i))
    t.start()

上述代碼寫了一個線程池類,基本實現了線程池的功能,但是有很多缺點,沒有實現回掉函數,每次執行任務的時候,任務處理函數每次執行完都需要自動執行對象的add方法,將線程對象添加到隊列中去,而且類初始化的時候,一次性將所有的線程類都添加到隊列中去了,總之上面的線程池雖然實現簡單,但是實際上卻有很多問題,下麵來看一個真正意義上的線程池。

  在寫代碼之前,我們先來看一下該怎麼設計這樣一個線程池,上面的線程池,我們的隊列中,存的是線程類,我們每處理一個任務都實例化一個線程,然後執行完了之後,該線程就被丟棄了,這樣有點不合適。我們這次設計的時候,

  1. 隊列中存的不是線程類,而是任務,我們從隊列中拿取的都是任務
  2. 每次執行任務的時候,不是都要生成一個線程,而是如果以前生成的線程有空閑的話,就用以前的線程
  3. 支持回掉機制,支持close,terminate

下麵來一下代碼是怎麼實現的

import threading
import queue
import time
import contextlib

class ThreadingPool:
    def __init__(self, num):
        self.max = num
        self.terminal = False
        self.q = queue.Queue()
        self.generate_list = []         # 保存已經生成的線程
        self.free_list = []             # 保存那些已經完成任務的線程

    def run(self, func, args=None, callbk=None):
        self.q.put((func, args, callbk))            # 將任務信息作為一個元祖放到隊列中去
        if len(self.free_list) == 0 and len(self.generate_list) < self.max:
           self.threadstart()

    def threadstart(self):
        t = threading.Thread(target=self.handel)
        t.start()

    def handel(self):
        current_thread = threading.current_thread()
        self.generate_list.append(current_thread)
        event = self.q.get()
        while event != 'stop':
            func, args, callbk = event
            flag = True
            try:
                ret = func(*args)
            except Exception as e:
                flag = False
                ret = e

            if callbk is not None:
                try:
                    callbk(ret)
                except Exception as e:
                    pass

            if not self.terminal:
                with self.auto_append_remove(current_thread):
                    event = self.q.get()
            else:
                event = 'stop'
        else:
            self.generate_list.remove(current_thread)

    def terminate(self):
        self.terminal = True

        while self.generate_list:
            self.q.put('stop')
        self.q.empty()

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put('stop')
            num -= 1

    @contextlib.contextmanager
    def auto_append_remove(self, thread):
        self.free_list.append(thread)
        try:
            yield
        finally:
            self.free_list.remove(thread)

def f(i):
    # time.sleep(1)
    return i

def f1(i):
    print(i)

p = ThreadingPool(5)
for i in range(20):
    p.run(func=f, args=(i,), callbk=f1)

p.close()

九、協程 

協程,又稱微線程,協程執行看起來有點像多線程,但是事實上協程就是只有一個線程,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯,此外因為只有一個線程,不需要多線程的鎖機制,也不存在同時寫變數衝突。協程的適用場景:當程式中存在大量不需要CPU的操作時(IO)下麵來看一個利用協程例子

from gevent import monkey
import gevent
import requests

# 把標準庫中的thread/socket等給替換掉
# 這樣我們在後面使用socket的時候可以跟平常一樣使用,無需修改任何代碼,但是它變成非阻塞的了.
monkey.patch_all()      # 猴子補丁

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

上面的例子,利用協程,一個線程完成所有的請求,發出請求的時候,不會等待回覆,而是一次性將所有的請求都發出求,收到一個回覆就處理一個回覆,這樣一個線程就解決了所有的事情,效率極高。

十、小結 

這篇博文是pyton基礎知識的最後一篇,後面會講的博文會講開始講前端的知識,這裡附上目錄http://www.cnblogs.com/Wxtrkbc/p/5606048.html,以後會繼續更新的,

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.ASP.NET Identity 適用所有類型的asp.net程式 ASP.NET MVC, Web Forms, Web Pages, Web API, and SignalR。 2.非常方便的擴展用戶數據欄位。只需一行代碼 設置好你的資料庫連接信息: 修改context的名稱: 打開試圖-》... ...
  • 一、前言 大多數系統裡面好像都有獲取消息的功能,但這些消息來源都不是實時的,比如你開兩個瀏覽器,用兩個不同的賬號登錄,用一個賬號給另外一個賬號發送消息,然而並不會實時收到消息,必須要自己手動F5刷新一下頁面才會顯示自己的消息,這樣感覺用戶體驗不太好。之前看了Learning hard關於Signal ...
  • 原文地址:http://docode.top/Article/Detail/10002 目錄: 1、Http協議上傳文件(以圖片為例)請求報文體內容格式 2、完整版HttpWebRequest模擬上傳文件請求報文內容封裝 3、asp.net(c#)使用HttpWebRequest攜帶請求參數模擬上傳 ...
  • 前言 用過Lucene.net的都知道,我們自己搭建索引伺服器時和解決搜索匹配度的問題都用到過盤古分詞。其中包含一個詞典。 那麼既然用到了這種國際化的框架,那麼就避免不了中文分詞。尤其是國內特殊行業比較多。比如油田系統從勘探、打井、投產等若幹環節都涉及一些專業辭彙。 再像電商,手機、手機配件、筆記本 ...
  • Arigis for WPF 標繪 箭頭 ArcGISPlotSilverlightAPI ...
  • 至2002微軟公司推出.NET平臺已近15年,在互聯網快速迭代的浪潮中,許多語言已被淘汰,同時也有更多新的語言涌現,但 .Net 依然堅挺的站在系統開發平臺的一線陣營中 ...
  • 先給大家介紹一個開源工具Sigar 官網:http://sigar.hyperic.com/ API: http://www.hyperic.com/support/docs/sigar/index-all.html(由於是英文的,英文不好的可以用谷歌瀏覽器的翻譯功能,直接轉換為簡體中文進行閱讀) ...
  • 1、python的moudles文件中__all__作用 Python的moudle是很重要的一個概念,我看到好多人寫的moudle里都有一個__init__.py文件。有的__init__.py中是空白,有的卻會有__all__參數。搜索了下總結下__all__參數的作用。 如果其他頁面impor ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...