封裝的Redis隊列 MyRedisQueue.py 接收端 發送端 ...
封裝的Redis隊列
MyRedisQueue.py
#!usr/bin/env python2.7
# -*- coding: utf-8 -*-
import redis
class RedisQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
# redis的預設參數為:host='localhost', port=6379, db=0, 其中db為定義redis database的數量
# r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 加上decode_responses=True,寫入的鍵值對中的value為str類型,不加這個參數寫入的則為位元組類型
# host是redis主機,需要redis服務端和客戶端都啟動 redis預設埠是6379
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
def qsize(self):
return self.__db.llen(self.key) # 返回隊列裡面list內元素的數量
def put(self, item, timeout=None):
self.__db.rpush(self.key, item) # 添加新元素到隊列最右方
if isinstance(timeout, int):
self.__db.expire(self.key, timeout)
def get_wait(self, timeout=None):
# 返回隊列第一個元素,如果為空則等待至有元素被加入隊列(超時時間閾值為timeout,如果為None則一直等待)
item = self.__db.blpop(self.key, timeout=timeout)
# if item:
# item = item[1] # 返回值為一個tuple
return item
def get_nowait(self):
# 直接返回隊列第一個元素,如果隊列為空返回的是None
item = self.__db.lpop(self.key)
return item
def get_all(self):
items = []
while True:
result = self.get_nowait()
if result:
items.append(eval(result))
else:
break
return items
接收端
#!usr/bin/env python2.7
# -*- coding: utf-8 -*-
from MyRedisQueue import RedisQueue
queue_name = "q1"
retCode = {"status": {"code": 0, "msg": "success"}}
redis_queue = RedisQueue(queue_name)
ret = redis_queue.get_wait(30) # 阻塞等待30s,直到隊列中有元素進來
if ret is None:
retCode["status"]["code"] = 2
retCode["status"]["msg"] = "超時未響應"
發送端
#!usr/bin/env python2.7
# -*- coding: utf-8 -*-
from MyRedisQueue import RedisQueue
queue_name = "q1"
redis_queue = RedisQueue(queue_name)
redis_queue.put("all done")