queue隊列 Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞。一個線程放入數據,另外一個線程取數據。 class queue.Queue(maxsize=0) #先入先出 class ...
queue隊列
Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞。一個線程放入數據,另外一個線程取數據。
class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #後入先出(Last in first out)
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先順序的隊列
隊列中的方法:
1.queue.Queue.get() #獲取隊列數據,當隊列是空的時候,會卡主,等待數據的放入,沒有數據放入,會一直阻塞
get(self, block=True, timeout=None) #預設狀態下,block()如果沒有數據是阻塞的
import queue q = queue.Queue() q.put(1) q.put(2) print(q.get()) print(q.get()) print(q.get())
運行程式:
1
2
阻塞................
隊列就是用來存取數據的,當數據取完之後,就會等待新的數據放入,get()就會一直等待,知道數據放入。要想不等待,可以使用下麵方法:
當然,使用get()加上參數block=False也能實現和get_nowait()一樣的功能。
block=True(False)設置當隊列是空的時候,是否阻塞,True阻塞,False不阻塞,報錯。timeout=None(time)設置阻塞時間,即等待一段時間,如果在這段時間內,沒有數據放入,就報錯。
2.get_nowait() #獲取數據,如果隊列是空的,則報錯
import queue q = queue.Queue() q.put(1) q.put(2) print(q.get()) print(q.get()) print(q.get_nowait())
執行結果如下:
1
2
Traceback (most recent call last):
File "/home/zhuzhu/day9/隊列.py", line 8, in <module>
print(q.get_nowait())
File "/usr/lib/python3.5/queue.py", line 192, in get_nowait
return self.get(block=False)
File "/usr/lib/python3.5/queue.py", line 161, in get
raise Empty
queue.Empty
上面使用,get_nowait(),如果隊列是空的,則報錯,可以用異常來抓取異常,然後可以繼續執行。
3.queue.Queue.qsize() #判斷隊列裡面元素的個數
import queue q = queue.Queue() print(q.qsize()) q.put(1) print(q.qsize()) q.put(2) print(q.qsize())
執行結果:
0
1
2
q.qsize()是判斷隊列的長度,如果長度為0,說明隊列是空的,這個時候使用get()就要註意,程式會阻塞。
4.q.qut() #向隊列中放入數據
put(self, item, block=True, timeout=None)
put()和get()差不多一樣,put()當隊列滿的時候,會報錯,block是設置阻塞狀態是否開啟,timeout是設置阻塞時間,預設一直阻塞。
5.q.empty(self) #判斷隊列是否是空Return True if the queue is empty, False otherwise (not reliable!).
6.q.full()
#判斷隊列是否是滿的Return True if the queue is full, False otherwise (not reliable!)
7.put_nowait() 等價於put(block=False) #如果隊列滿了,則報錯。Put an item into the queue without blocking
下麵來看一下LifoQueue,後進先出的情形:
import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) print("第一個取出:",q.get()) print("第二個取出:",q.get())
上面是LifoQueue(maxsize=0)的情形,後進入的先被取出。
下麵來看一下PriorityQueue的情形,有優先順序的queue:
import queue q = queue.PriorityQueue() q.put((3,"alex")) q.put((1,"geng")) q.put((8,"zeng")) print("第一個取出",q.get()) print("第二個取出:",q.get()) print("第三個取出:",q.get())
執行結果:
第一個取出 (1, 'geng')
第二個取出: (3, 'alex')
第三個取出: (8, 'zeng')
上面程式中,是有優先順序的放入,put((等級,內容)),存放以元組形式放入,前一個是登記,後面一個是消息。用來VIP優先順序的情形。
生產者消費者模型
在併發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程式的整體處理數據的速度。
為什麼要使用生產者和消費者模式
線上程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為瞭解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
下麵來學習一個最基本的生產者消費者模型的例子:
'''生產者消費者模型就是兩個線程,一個生產,另外一個消費,兩者相互配合,有交互''' import queue,time,threading def producer(name): '''定義生產者模型''' count = 1 #初始化變數 while True: q.put("骨頭%s" %count) #生成骨頭 print("[%s]生成了骨頭%s" %(name,count)) count += 1 #每次生產一個 time.sleep(0.5) #定義產能,生產效率 def consumer(name): '''定義消費者模型''' while True: print("\033[31m[%s] 吃了[%s]\033[0m" %(name,q.get())) time.sleep(1) #定義消費效率 if __name__ == "__main__": try: q = queue.Queue(maxsize=10) #初始化一個Queue,並且定義最大容量 p = threading.Thread(target=producer,args=("geng",)) #初始化生產者線程 p.start() except KeyboardInterrupt as f: print("生產者線程斷開了!!") try: c=threading.Thread(target=consumer,args=("alex",)) c.start() except KeyboardInterrupt as e: print("線程斷開了!!!")
執行結果:
[geng]生成了骨頭1
[alex] 吃了[骨頭1]
[geng]生成了骨頭2
[geng]生成了骨頭3
[alex] 吃了[骨頭2]
[geng]生成了骨頭4
[alex] 吃了[骨頭3]
[geng]生成了骨頭5
[geng]生成了骨頭6
[alex] 吃了[骨頭4]
[geng]生成了骨頭7
[geng]生成了骨頭8
[alex] 吃了[骨頭5]
[geng]生成了骨頭9
[geng]生成了骨頭10
[alex] 吃了[骨頭6]
[geng]生成了骨頭11
[geng]生成了骨頭12
[alex] 吃了[骨頭7]
[geng]生成了骨頭13
[geng]生成了骨頭14
上面就是生產者和消費者的簡單模型,使用了queue(隊列),生成者就是生成商品,然後放到隊列中;消費者就是去這個隊列中根據條件取數,這樣不斷生產和取數,就是簡單的生產者消費者模型,其中time.sleep()是生成效率和消費效率,控製程序的節奏,而count+=1代表消費者生產的能力,每次只生成一個,如果把這個調成10,那麼效率就很高,每次生成完成之後,都要等待很久。當然,要調效率,要修改一下代碼。
隊列queue的源代碼如下:
'''A multi-producer, multi-consumer queue.''' try: import threading except ImportError: import dummy_threading as threading from collections import deque from heapq import heappush, heappop from time import monotonic as time __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): 'Exception raised by Queue.get(block=0)/get_nowait().' pass class Full(Exception): 'Exception raised by Queue.put(block=0)/put_nowait().' pass class Queue: '''Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. ''' def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = threading.Condition(self.mutex) # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def task_done(self): '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. ''' with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished def join(self): '''Blocks until all items in the Queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. ''' with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() def qsize(self): '''Return the approximate size of the queue (not reliable!).''' with self.mutex: return self._qsize() def empty(self): '''Return True if the queue is empty, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race condition where a queue can grow before the result of empty() or qsize() can be used. To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method. ''' with self.mutex: return not self._qsize() def full(self): '''Return True if the queue is full, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used. ''' with self.mutex: return 0 < self.maxsize <= self._qsize() def put(self, item, block=True, timeout=None): '''Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() def get(self, block=True, timeout=None): '''Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). ''' with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def put_nowait(self, item): '''Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception. ''' return self.put(item, block=False) def get_nowait(self): '''Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. ''' return self.get(block=False) # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held # Initialize the queue representation def _init(self, maxsize): self.queue = deque() def _qsize(self): return len(self.queue) # Put a new item in the queue def _put(self, item): self.queue.append(item) # Get an item from the queue def _get(self): return self.queue.popleft() class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): heappush(self.queue, item) def _get(self): return heappop(self.queue) class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
看看源代碼,能夠讓自己對這些方法,有更好的理解,之後會多看看源代碼是如何寫的。多參考源代碼的寫法,裡面有很多好的書寫習慣和格式。