純粹是記錄一下自己在剛開始使用的時候遇到的一些坑,以及自己是怎樣通過配合redis來解決問題的。文章分為三個部分,一是怎樣跑起來,並且怎樣監控相關的隊列和任務;二是遇到的幾個坑;三是給一些自己配合redis使用的代碼示例。 一.celery使用: Ⅰ.把任務中間件伺服器跑起來,rabbitmq-se ...
純粹是記錄一下自己在剛開始使用的時候遇到的一些坑,以及自己是怎樣通過配合redis來解決問題的。
文章分為三個部分,一是怎樣跑起來,並且怎樣監控相關的隊列和任務;二是遇到的幾個坑;三是給一些自己配合redis使用的代碼示例。
一.celery使用:
Ⅰ.把任務中間件伺服器跑起來,rabbitmq-server
跑起來以後,就能在瀏覽器(http://localhost:15672/#/queues)裡面看中間件裡面的相關內容了。
(如果想把這邊的某些隊列下麵的沒有跑完的任務丟棄掉的話,進對應的隊列,點擊delete)
Ⅱ.在終端裡面對應路徑下麵把celery的任務隊列跑起來:
eg:celery -A celery_autowork_task worker --loglevel=info -Q auto_work
註意裡面的各個參數的意義
Ⅲ.看具體的每個任務的執行情況,是否成功之類的
1.安裝flower,pip3 install flower
2.用celery把flower跑起來celery flower
3.http://localhost:5555在這個鏈接,或者終端裡面看見的就能看具體的每個任務的執行情況
二.坑:
Ⅰ.異常場景:併發數設置成4,然後第一次進去隊列的時候是用的第一個,retry的時候,居然在4和2裡面都觸發了任務。也就是關了一個,開了兩個。
異常原因:在exception的代碼段裡面加了retry功能, 然後在try的代碼段裡面也加了try功能,但是查看retry函數本身的定義,發現它的類型就是exception;也就是調用這個本身就是一個異常會走到處理異常的代碼段裡面去;所以更好的方式是只在exception的代碼段裡面retry,然後如果try裡面需要retry的話,直接用raise 搞一個異常出來;
Ⅱ.異常場景:在task裝飾器裡面定義重試間隔時間default_retry_delay,或者在retry裡面指定countdown為5秒,但是在終端裡面看見的一個任務的兩次開始執行時間發現都會大於5秒。開始還以為是不是兩個設置會互相影響。
異常原因:後來發現的default_retry_dalay是在retry的時候沒有傳參數的的預設重試間隔時間,可以看模塊代碼http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html?highlight=task;但之所以一個任務的兩次開始執行時間大於5秒是因為,我們的retry一般都寫在exception的代碼塊裡面,而這個間隔時間,其實指的是,執行到retry的時間往後推5秒。try裡面的操作會有耗時,所以導致了兩次任務的開始時間大於5秒。那麼如果想要兩次的開始時間為5秒的話,可以在進入任務的時候進行一下計時,然後任務重試的時候看整個try的代碼塊的耗時。
Ⅲ.異常場景:在task裝飾器裡面提供了max_retries最大重試次數,然後到達最大重試次數(比如10次)以後,在第十次的重試裡面仍然會走到retry方法,這個時候,居然在終端開間異常拋出了。
異常原因:按我的個人理解,這種設定了10次重試次數,但是在第十次開啟第11次的重試時候,應該模塊是自動不讓重試才對的。可是貌似沒有完成,或者可能是我沒有搜索到正確的使用方法;解決辦法就是自己在重試之前看當前的重試次數,自己控制
Ⅳ.異常場景:如果一次任務在第一次的發起,重試次數還沒到的時候想要第二次發起怎麼辦?就是一般來說還是只允許一個實際任務只有一個隊列裡面的任務處理它
處理辦法:不管是delay還是apply_async方法都是有返回值的,這個返回值的是celery裡面的一個類,可以string以後得到任務的ID,然後之後用app.control裡面的revoke方法把對應的任務中斷掉。可以查看對應模塊的源代碼http://docs.celeryproject.org/en/latest/_modules/celery/app/control.html
三.代碼示例
celery_autowork_task.py【任務執行方法的定義的地方】
from celery import Celery AutoWork = Celery('auto_work', broker = 【CELERY_BROKER】, backend = '') AutoWork.conf.CELERY_TIMEZONE = 'Asia/Shanghai' # 時區 AutoWork.conf.CELERYD_CONCURRENCY = 4 # 任務併發數 AutoWork.conf.CELERYD_TASK_SOFT_TIME_LIMIT = 300 # 任務超時時間 AutoWork.conf.CELERY_DISABLE_RATE_LIMITS = True # 任務頻率限制開關 AutoWork.conf.CELERY_ROUTES = { # 任務調度隊列 "autowork_check_barcode_recharge":{"queue":"auto_work"}, } @AutoWork.task(bind=True,name="autowork_check_barcode_recharge",max_retries=15) def autowork_check_barcode_recharge(self,recharge_id): time_begin=datetime.datetime.now() try: pass except Exception as exc: retries=self.request.retries if retries<self.max_retries: delta_second=(datetime.datetime.now()-time_begin).seconds if delta_second<5: return self.retry(exc = exc,countdown=5-delta_second) else: return self.retry(exc = exc,countdown=0) finally: pass
test.py【調用任務以及檢查任務的執行情況】
def cancel_pre_celery_task_and_excute_next_task(self,recharge_record): recharge_work_guid="recharge_work_guid:%d:%s"%(recharge_record.shop_id,recharge_record.num) from handlers.celery_autowork_task import autowork_check_barcode_recharge if redis.get(recharge_work_guid): #上次是否有執行這個實際業務中的任務 from celery_autowork_task import AutoWork AutoWork.control.revoke(redis.get(recharge_work_guid).decode('utf-8'),terminate=True) #如果有執行,則中斷 autowork_guid=autowork_check_barcode_recharge.delay(recharge_record.id) #得到本次的任務GUID redis.set(recharge_work_guid,str(autowork_guid)) #保存到redis裡面去 redis.expire(recharge_work_guid,3600)