二、Celery基本使用 1.創建一個celery application 用來定義你的任務列表,創建一個任務文件就叫tasks.py吧。 from celery import Celery # 配置好celery的backend和broker app = Celery('task1', backe ...
一、Celery 核心模塊
1. Brokers
brokers 中文意思為中間人,在這裡就是指任務隊列本身,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker,Brokers 就是生產者和消費者存放/拿取產品的地方(隊列)。Celery 扮演生產者和消費者的角色。
常見的 brokers 有 rabbitmq、redis、Zookeeper 等。推薦用Redis或RabbitMQ實現隊列服務。
2. Workers
就是 Celery 中的工作者,執行任務的單元,類似與生產/消費模型中的消費者。它實時監控消息隊列,如果有任務就從隊列中取出任務並執行它。
3. Backend / Result Stores
用於存儲任務的執行結果。隊列中的任務運行完後的結果或者狀態需要被任務發送者知道,那麼就需要一個地方儲存這些結果,就是 Result Stores 了。
常見的 backend 有 redis、Memcached 甚至常用的資料庫都可以。
4. Tasks
就是想在隊列中進行的任務,有非同步任務和定時任務。一般由用戶、觸發器或其他操作將任務入隊,然後交由 workers 進行處理。
5. Beat
定時任務調度器,根據配置定時將任務發送給Brokers。
二、Celery 基本使用
1.創建一個celery application 用來定義你的任務列表,創建一個任務文件就叫tasks.py吧。
from celery import Celery # 配置好celery的backend和broker app = Celery('task1', backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0') #普通函數裝飾為 celery task @app.task def add(x, y): return x + y
如此而來,我們只是定義好了任務函數func函數和worker(celery對象)。worker相當於工作者。
2.啟動Celery Worker來開始監聽並執行任務。broker 我們有了,backend 我們有了,task 我們也有了,現在就該運行 worker 進行工作了,在 tasks.py 所在目錄下運行:
[root@localhost ~]# celery -A tasks worker --loglevel=info # 啟動方法1 [root@localhost ~]# celery -A tasks worker --l debug # 啟動方法2
現在 tasks 這個任務集合的 worker 在進行工作(當然此時broker中還沒有任務,worker此時相當於待命狀態),如果隊列中已經有任務了,就會立即執行。
3.調用任務:要給Worker發送任務,需要調用 delay() 方法。
import time from tasks import add # 不要直接add(6, 6),這裡需要用 celery 提供的介面 delay 進行調用 result = add.delay(6, 6) while not result.ready(): time.sleep(1) print('task done: {0}'.format(result.get()))
三、Celery 進階使用
1.celery_config.py:配置文件
from __future__ import absolute_import, unicode_literals #從python的絕對路徑導入而不是當前的腳本 #在python2和python3做相容支持的 BROKER_URL = 'redis://127.0.0.1:6379/0' # 指定結果的接受地址 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
2.tasks.py
from __future__ import absolute_import, unicode_literals #從python的絕對路徑導入而不是當前的腳本 #在python2和python3做相容支持的 from celery import Celery # 配置好celery的backend和broker, task1:app的名字。broker app = Celery('task1', # broker='redis://127.0.0.1:6379/0', # 消息隊列:連rabbitmq或redis backend='redis://127.0.0.1:6379/0') # 存儲結果:redis或mongo或其他資料庫 app.config_from_object("celery_config") app.conf.update( # 給app設置參數 result_expires=3600, # 保存時間為1小時 ) #普通函數裝飾為 celery task @app.task def add(x, y): return x + y if __name__ == '__main__': app.start()
3.啟動worker
[root@localhost ~]# celery -A tasks worker --loglevel=info
4.test.py
# -*- coding:utf-8 -*- import time from tasks import add # 不要直接add(4, 4),這裡需要用 celery 提供的介面 delay 進行調用 result = add.delay(6, 6) print(result.id) while not result.ready(): time.sleep(1) print('task done: {0}'.format(result.get()))
四、Celery 定時任務
參考:https://www.cnblogs.com/forward-wang/p/5970806.html
參考:https://www.cnblogs.com/shizhengwen/p/6911043.html
參考:https://blog.51cto.com/steed/2292346?source=dra
參考:https://blog.csdn.net/qq_37049050/article/details/82260151
參考:https://www.cnblogs.com/zhangbingsheng/p/10384517.html
參考:https://www.cnblogs.com/cwp-bg/p/8759638.html