一、Celery 核心模塊 1. Brokers brokers 中文意思為中間人,在這裡就是指任務隊列本身,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker,Brokers 就是生產者和消費者存放/拿取產品的地方(隊列)。Celery 扮演生產者和消費者的角色。 常見的 ...
一、Celery 核心模塊
1. Brokers
brokers 中文意思為中間人,在這裡就是指任務隊列本身,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker,Brokers 就是生產者和消費者存放/拿取產品的地方(隊列)。Celery 扮演生產者和消費者的角色。
常見的 brokers 有 rabbitmq、redis、Zookeeper 等。推薦用Redis或RabbitMQ實現隊列服務。
2. Workers
就是 Celery 中的工作者,執行任務的單元,類似與生產/消費模型中的消費者。它實時監控消息隊列,如果有任務就從隊列中取出任務並執行它。
3. Backend / Result Stores
用於存儲任務的執行結果。隊列中的任務運行完後的結果或者狀態需要被任務發送者知道,那麼就需要一個地方儲存這些結果,就是 Result Stores 了。
常見的 backend 有 redis、Memcached 甚至常用的資料庫都可以。
4. Tasks
就是想在隊列中進行的任務,有非同步任務和定時任務。一般由用戶、觸發器或其他操作將任務入隊,然後交由 workers 進行處理。
5. Beat
定時任務調度器,根據配置定時將任務發送給Brokers。
二、Celery 基本使用
1.創建一個celery application 用來定義你的任務列表,創建一個任務文件就叫tasks.py吧。
from celery import Celery
# 配置好celery的backend和broker
app = Celery('task1', backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0')
#普通函數裝飾為 celery task
@app.task
def add(x, y):
return x + y
如此而來,我們只是定義好了任務函數func函數和worker(celery對象)。worker相當於工作者。
2.啟動Celery Worker來開始監聽並執行任務。broker 我們有了,backend 我們有了,task 我們也有了,現在就該運行 worker 進行工作了,在 tasks.py 所在目錄下運行:
[root@localhost ~]# celery -A tasks worker --loglevel=info # 啟動方法1
[root@localhost ~]# celery -A tasks worker --l debug # 啟動方法2
現在 tasks 這個任務集合的 worker 在進行工作(當然此時broker中還沒有任務,worker此時相當於待命狀態),如果隊列中已經有任務了,就會立即執行。
3.調用任務:要給Worker發送任務,需要調用 delay() 方法。
import time
from tasks import add
# 不要直接add(6, 6),這裡需要用 celery 提供的介面 delay 進行調用
result = add.delay(6, 6)
while not result.ready():
time.sleep(1)
print('task done: {0}'.format(result.get()))
三、Celery 進階使用
1.celery_config.py:配置文件
from __future__ import absolute_import, unicode_literals
#從python的絕對路徑導入而不是當前的腳本 #在python2和python3做相容支持的
BROKER_URL = 'redis://127.0.0.1:6379/0'
# 指定結果的接受地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
2.tasks.py
from __future__ import absolute_import, unicode_literals
#從python的絕對路徑導入而不是當前的腳本 #在python2和python3做相容支持的
from celery import Celery
# 配置好celery的backend和broker, task1:app的名字。broker
app = Celery('task1', #
broker='redis://127.0.0.1:6379/0', # 消息隊列:連rabbitmq或redis
backend='redis://127.0.0.1:6379/0') # 存儲結果:redis或mongo或其他資料庫
app.config_from_object("celery_config")
app.conf.update( # 給app設置參數
result_expires=3600, # 保存時間為1小時
)
#普通函數裝飾為 celery task
@app.task
def add(x, y):
return x + y
if __name__ == '__main__':
app.start()
3.啟動worker
[root@localhost ~]``# celery -A tasks worker --loglevel=info
4.test.py
import time
from tasks import add
# 不要直接add(4, 4),這裡需要用 celery 提供的介面 delay 進行調用
result = add.delay(6, 6)
print(result.id)
while not result.ready():
time.sleep(1)
print('task done: {0}'.format(result.get()))