celery是用python寫的一個非同步的任務框架,功能非常強大,具體的說明可以查看官網,這裡主要提供點demo讓你迅速使用該框架 1.環境安裝 預設安裝好了redis pip install celery pip install celery redis 用來作為任務消息的載體 2. tasks. ...
celery是用python寫的一個非同步的任務框架,功能非常強大,具體的說明可以查看官網,這裡主要提供點demo讓你迅速使用該框架 1.環境安裝 預設安裝好了redis pip install celery redis 用來作為任務消息的載體 2. tasks.py
import sys reload(sys) sys.setdefaultencoding('utf-8’) # 不加這句話,列印中文log會出錯 from celery import Celery celery = Celery('tasks', broker='redis://127.0.0.1:6379/0') #選擇本地redis db=0 作為消息載體, 第一個參數為任務名稱 from celery.utils.log import get_task_logger # 倒入celery 中的log模塊 logger = get_task_logger(__name__) @celery.task(bind=True, max_retries=10, default_retry_delay=1 * 6) # bind 表示開啟, max_retries 是重新嘗試的次數,default_retry_delay 是預設的間隔時間,嘗試的時間 def exec_task_order_overtime(self, order_id): # 訂單到期後,執行訂單失效的任務 try: logger.info('===================> exec_task_order_overtime order_id=%s' % order_id) success = BaseHandler.context_services.order_overtime_task_service.process_over_time(order_id) if success is False: logger.error( '<================order_overtime_task_service.process_over_time Failed, order_id=%s' % order_id) raise Return(False) else: logger.info( '<=================order_overtime_task_service.process_over_time Success, order_id=%s' % order_id) except Exception as exc: logger.info('exec_task_order_overtime retry, order_id=%s' % order_id) raise self.retry(exc=exc, countdown=3) # 3 秒後繼續嘗試, 此處的countdown 優先順序高於裝飾器中的default_retry_delay
該文件路徑下執行命令, 此後celery 開始作為消費之執行任務 $celery worker -A tasks --loglevel=info 生產者呢?執行下麵語句後,redis中的db=0 的 celery key是一個 list 類型, 裡面存放著執行任務,如果celery沒有開啟可以清晰看到;開啟了celery可能已經被執行了
from celery import Celery celery = Celery('tasks', broker='redis://127.0.0.1:6379/0') #消息載體 push_task_id = celery.send_task('tasks.exec_task_order_overtime' , [order_id] # 參數,必須為list,具體可見源碼,第三個可以為dict,我們這裡沒有使用 , countdown=10) #延時多久執行 推送消息
疑問1:
有的人會奇怪為什麼exec_task_order_overtime有self, 有時候發現沒有,區別在與裝飾器task,如果@celery.task, 不是裝飾器函數調用,則沒有self,The bind argument to the task decorator will give access toself
(the task type instance)
疑問2:
celery 的構造函數的參數,第一個為模塊名,也就是本文件名稱, 第二個為redis載體地址,看官網介紹
The first argument to Celery
is the name of the current module, this only needed so names can be automatically generated when the tasks are defined in the __main__module.
The second argument is the broker keyword argument, specifying the URL of the message broker you want to use. Here using RabbitMQ (also the default option).
疑問3:
celery記憶體不足時,沒有反饋機制:
我們知道socket網路傳輸中,當接受端來不及處理的時候,發送端會阻塞;celery 完全沒有相關機制;
我們不需要發送端阻塞,當然也不能,但是我們的celery來不及處理時,應該緩存一些在redis中,雖然會造成消息處理不及時,但是也不至於記憶體不足的問題出現,這裡可能需要自己做一些處理;
例如:retry次數控制儘可能少,任務執行失敗後記錄下來,根據業務需要比如10分鐘後再次拋入redis消息隊列中