day9--隊列queue

来源:http://www.cnblogs.com/gengcx/archive/2017/09/10/7500154.html
-Advertisement-
Play Games

queue隊列 Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞。一個線程放入數據,另外一個線程取數據。 class queue.Queue(maxsize=0) #先入先出 class ...


queue隊列

    Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞。一個線程放入數據,另外一個線程取數據。

    class queue.Queue(maxsize=0)                                            #先入先出

    class queue.LifoQueue(maxsize=0)                                        #後入先出(Last in first out)

    class queue.PriorityQueue(maxsize=0)                                    #存儲數據時可設置優先順序的隊列

    隊列中的方法:

    1.queue.Queue.get()     #獲取隊列數據,當隊列是空的時候,會卡主,等待數據的放入,沒有數據放入,會一直阻塞

    get(self, block=True, timeout=None)     #預設狀態下,block()如果沒有數據是阻塞的

import queue

q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get())
運行程式:
1
2
阻塞................

 

    隊列就是用來存取數據的,當數據取完之後,就會等待新的數據放入,get()就會一直等待,知道數據放入。要想不等待,可以使用下麵方法:

    當然,使用get()加上參數block=False也能實現和get_nowait()一樣的功能。

    block=True(False)設置當隊列是空的時候,是否阻塞,True阻塞,False不阻塞,報錯。timeout=None(time)設置阻塞時間,即等待一段時間,如果在這段時間內,沒有數據放入,就報錯。

    2.get_nowait()          #獲取數據,如果隊列是空的,則報錯  

import queue

q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get_nowait())
執行結果如下:
1
2
Traceback (most recent call last):
  File "/home/zhuzhu/day9/隊列.py", line 8, in <module>
    print(q.get_nowait())
  File "/usr/lib/python3.5/queue.py", line 192, in get_nowait
    return self.get(block=False)
  File "/usr/lib/python3.5/queue.py", line 161, in get
    raise Empty
queue.Empty

    上面使用,get_nowait(),如果隊列是空的,則報錯,可以用異常來抓取異常,然後可以繼續執行。

    3.queue.Queue.qsize()    #判斷隊列裡面元素的個數

import queue

q = queue.Queue()
print(q.qsize())
q.put(1)
print(q.qsize())
q.put(2)
print(q.qsize())
執行結果:
0
1
2

    q.qsize()是判斷隊列的長度,如果長度為0,說明隊列是空的,這個時候使用get()就要註意,程式會阻塞。

    4.q.qut()   #向隊列中放入數據

    put(self, item, block=True, timeout=None)

    put()和get()差不多一樣,put()當隊列滿的時候,會報錯,block是設置阻塞狀態是否開啟,timeout是設置阻塞時間,預設一直阻塞。

    5.q.empty(self)     #判斷隊列是否是空Return True if the queue is empty, False otherwise (not reliable!).

    6.q.full()         #判斷隊列是否是滿的Return True if the queue is full, False otherwise (not reliable!)

    7.put_nowait()  等價於put(block=False)   #如果隊列滿了,則報錯Put an item into the queue without blocking

    下麵來看一下LifoQueue,後進先出的情形:

import queue

q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)

print("第一個取出:",q.get())
print("第二個取出:",q.get())

    上面是LifoQueue(maxsize=0)的情形,後進入的先被取出。

    下麵來看一下PriorityQueue的情形,有優先順序的queue:

import queue

q = queue.PriorityQueue()

q.put((3,"alex"))
q.put((1,"geng"))
q.put((8,"zeng"))

print("第一個取出",q.get())
print("第二個取出:",q.get())
print("第三個取出:",q.get())
執行結果:
第一個取出 (1, 'geng')
第二個取出: (3, 'alex')
第三個取出: (8, 'zeng')

    上面程式中,是有優先順序的放入,put((等級,內容)),存放以元組形式放入,前一個是登記,後面一個是消息。用來VIP優先順序的情形。

    生產者消費者模型

    在併發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程式的整體處理數據的速度。

    為什麼要使用生產者和消費者模式

    線上程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為瞭解決這個問題於是引入了生產者和消費者模式。

    什麼是生產者消費者模式

    生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

    下麵來學習一個最基本的生產者消費者模型的例子:

'''生產者消費者模型就是兩個線程,一個生產,另外一個消費,兩者相互配合,有交互'''
import queue,time,threading

def producer(name):
    '''定義生產者模型'''
    count = 1                                                    #初始化變數
    while True:
        q.put("骨頭%s" %count)                                   #生成骨頭
        print("[%s]生成了骨頭%s" %(name,count))
        count += 1                                               #每次生產一個
        time.sleep(0.5)                                          #定義產能,生產效率

def consumer(name):
    '''定義消費者模型'''
    while True:
        print("\033[31m[%s] 吃了[%s]\033[0m" %(name,q.get()))
        time.sleep(1)                                             #定義消費效率


if __name__ == "__main__":
    try:
        q = queue.Queue(maxsize=10)                              #初始化一個Queue,並且定義最大容量
        p = threading.Thread(target=producer,args=("geng",))     #初始化生產者線程
        p.start()
    except KeyboardInterrupt as f:
        print("生產者線程斷開了!!")

    try:
        c=threading.Thread(target=consumer,args=("alex",))
        c.start()
    except KeyboardInterrupt as e:
        print("線程斷開了!!!")
執行結果:
[geng]生成了骨頭1
[alex] 吃了[骨頭1]
[geng]生成了骨頭2
[geng]生成了骨頭3
[alex] 吃了[骨頭2]
[geng]生成了骨頭4
[alex] 吃了[骨頭3]
[geng]生成了骨頭5
[geng]生成了骨頭6
[alex] 吃了[骨頭4]
[geng]生成了骨頭7
[geng]生成了骨頭8
[alex] 吃了[骨頭5]
[geng]生成了骨頭9
[geng]生成了骨頭10
[alex] 吃了[骨頭6]
[geng]生成了骨頭11
[geng]生成了骨頭12
[alex] 吃了[骨頭7]
[geng]生成了骨頭13
[geng]生成了骨頭14

    上面就是生產者和消費者的簡單模型,使用了queue(隊列),生成者就是生成商品,然後放到隊列中;消費者就是去這個隊列中根據條件取數,這樣不斷生產和取數,就是簡單的生產者消費者模型,其中time.sleep()是生成效率和消費效率,控製程序的節奏,而count+=1代表消費者生產的能力,每次只生成一個,如果把這個調成10,那麼效率就很高,每次生成完成之後,都要等待很久。當然,要調效率,要修改一下代碼。

    隊列queue的源代碼如下:

'''A multi-producer, multi-consumer queue.'''

try:
    import threading
except ImportError:
    import dummy_threading as threading
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time

__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']

class Empty(Exception):
    'Exception raised by Queue.get(block=0)/get_nowait().'
    pass

class Full(Exception):
    'Exception raised by Queue.put(block=0)/put_nowait().'
    pass

class Queue:
    '''Create a queue object with a given maximum size.

    If maxsize is <= 0, the queue size is infinite.
    '''

    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)

        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the three conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()

        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)

        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = threading.Condition(self.mutex)

        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    def task_done(self):
        '''Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        '''
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    def join(self):
        '''Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        '''
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self):
        '''Return the approximate size of the queue (not reliable!).'''
        with self.mutex:
            return self._qsize()

    def empty(self):
        '''Return True if the queue is empty, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() == 0
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can grow before the result of empty() or
        qsize() can be used.

        To create code that needs to wait for all queued tasks to be
        completed, the preferred technique is to use the join() method.
        '''
        with self.mutex:
            return not self._qsize()

    def full(self):
        '''Return True if the queue is full, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() >= n
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can shrink before the result of full() or
        qsize() can be used.
        '''
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def put(self, item, block=True, timeout=None):
        '''Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def get(self, block=True, timeout=None):
        '''Remove and return an item from the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
        '''
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def put_nowait(self, item):
        '''Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        '''
        return self.put(item, block=False)

    def get_nowait(self):
        '''Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        '''
        return self.get(block=False)

    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
        self.queue = deque()

    def _qsize(self):
        return len(self.queue)

    # Put a new item in the queue
    def _put(self, item):
        self.queue.append(item)

    # Get an item from the queue
    def _get(self):
        return self.queue.popleft()


class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)


class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

    看看源代碼,能夠讓自己對這些方法,有更好的理解,之後會多看看源代碼是如何寫的。多參考源代碼的寫法,裡面有很多好的書寫習慣和格式。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • WebForm.aspx 頁面通過 AJAX 訪問WebForm.aspx.cs類中的方法,獲取數據 WebForm1.aspx 頁面 (原生AJAX請求,寫法一) WebForm1.aspx 頁面 (AJAX請求,寫法二,一般情況下我們用這種) WebForm1.aspx.cs 如果你是try.. ...
  • APM非同步編程模式最具代表性的特點是:一個非同步功能由以Begin開頭、End開頭的兩個方法組成。Begin開頭的方法表示啟動非同步功能的執行,End開頭的方法表示等待非同步功能執行結束並返回執行結果。 ...
  • 1 Maven的簡介 是apache下的一個開源項目,是純java開發,並且只是用來管理java項目的 Svn eclipse maven量級 同一個項目,普通的傳統項目(24M)而Maven項目只需要(724KB) 分析:maven項目為什麼這麼小?沒有jar。 需要jar嗎?肯定需要。沒有存在於 ...
  • 超過十年以上,沒有比解釋器全局鎖(GIL)讓Python新手和專家更有挫折感或者更有好奇心。 Python的底層 要理解GIL的含義,我們需要從Python的基礎講起。像C++這樣的語言是編譯型語言,所謂編譯型語言,是指程式輸入到編譯器,編譯器再根據語言的語法進行解析,然後翻譯成語言獨立的中間表示, ...
  • 前段時間應因緣梳理了下自己的 Java 知識體系, 成文一篇望能幫到即將走進或正在 Java 世界跋涉的程式員們。 第一張,基礎圖 大約在 2003 年我開始知道 Java 的(當時還在用 Delphi),但到 2004 年本科畢業才開始正式決定學習 Java。 那時覺得用 Delphi 寫 C/S ...
  • Resource有兩種使用場景 1.Resource 當Resource後面沒帶參數的時候是根據它所註釋的屬性名稱到applicationContext.xml文件中查找是否有bean的id與之匹配,如果有,就將對應的class賦值給它, 如果沒有則根據註釋屬性的類型到配置文件進行匹配,如果有就賦值 ...
  • 所謂JDBC就是利用java與資料庫相連接的技術,從資料庫獲取既有的信息或者把網頁上的信息存儲到資料庫。 這裡簡單的介紹公司的一個小項目中的一部分,由於代碼較多,所以用圖片形式進行展示。源碼請查看源碼庫,稍後上傳。 圖1-信息圖 圖2-用戶圖 如上圖所示的兩個模塊,對應著資料庫當中的兩張表。第一張表 ...
  • 前言 有時候想用一個簡潔點兒的備忘錄,發現沒有簡潔好用的,於是就想著開發一個,秉著簡潔 的思想,所以連界面都沒有,只能通過命令行來操作(儘可能的將命令簡化)。設計的時候 借鑒了git分支的思想,每個備忘錄都等同於一個分支,我們可以創建多個備忘錄。功能上 可以查看所有備忘錄的名稱,切換備忘錄,添加備忘 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...