1、Celery的概念 Celery 是一個 基於python開發的分散式非同步消息任務隊列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子: 非同步任務:將耗時的操作任務提交給Celery去非同步執行,比如發送簡訊/郵件 ...
1、Celery的概念
Celery 是一個 基於python開發的分散式非同步消息任務隊列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子:
非同步任務:將耗時的操作任務提交給Celery去非同步執行,比如發送簡訊/郵件、消息推送、音頻處理等等
定時任務: 每天定時執行爬蟲爬取指定內容
還可以使用celery實現簡單的分散式爬蟲系統等等
Celery 在執行任務時需要通過一個消息中間件(Broker)來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis,
2、Celery的特點
- 簡單,易於使用和維護,有豐富的文檔。
- 高效,支持多線程、多進程、協程模式運行,單個celery進程每分鐘可以處理數百萬個任務。
- 靈活,celery中幾乎每個部分都可以自定義擴展。
3、Celery的作用
應用解耦,非同步處理,流量削鋒,消息通訊。
celery通過消息(任務)進行通信,
celery通常使用一個叫Broker(中間人/消息中間件/消息隊列/任務隊列)來協助clients(任務的發出者/客戶端)和worker(任務的處理者/工作進程)進行通信的.
clients發出消息到任務隊列中,broker將任務隊列中的信息派發給worker來處理。
client ---> 消息 --> Broker(消息隊列) -----> 消息 ---> worker(celery運行起來的工作進程)
消息隊列(Message Queue),也叫消息隊列中間件,簡稱消息中間件,它是一個獨立運行的程式,表示在消息的傳輸過程中臨時保存消息的容器。
所謂的消息,是指代在兩台電腦或2個應用程式之間傳送的數據。消息可以非常簡單,例如文本字元串或者數字,也可以是更複雜的json數據或hash數據等。
所謂的隊列,是一種先進先出、後進呼後出的數據結構,python中的list數據類型就可以很方便地用來實現隊列結構。
目前開發中,使用較多的消息隊列有RabbitMQ,Kafka,RocketMQ,MetaMQ,ZeroMQ,ActiveMQ等,當然,像redis、mysql、MongoDB,也可以充當消息中間件,但是相對而言,沒有上面那麼專業和性能穩定。
併發任務10k以下的,直接使用redis
併發任務10k以上,1000k以下的,直接使用RabbitMQ
併發任務1000k以上的,直接使用RocketMQ
4、Celery的運行架構
Celery的運行架構由三部分組成,消息隊列(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
一個celery系統可以包含很多的worker和broker
Celery本身不提供消息隊列功能,但是可以很方便地和第三方提供的消息中間件進行集成,包括Redis,RabbitMQ,RocketMQ等
5、Celery執行流程圖
6、Celery安裝
pip install -U celery -i https://pypi.tuna.tsinghua.edu.cn/simple
註意:
Celery不建議在windows系統下使用,Celery在4.0版本以後不再支持windows系統,所以如果要在windows下使用只能安裝4.0以前的版本,而且即便是4.0之前的版本,在windows系統下也是不能單獨使用的,需要安裝gevent、geventlet或eventlet協程模塊
7、基本使用
使用celery第一件要做的最為重要的事情是需要先創建一個Celery實例對象,我們一般叫做celery應用對象,或者更簡單直接叫做一個app。app應用對象是我們使用celery所有功能的入口,比如啟動celery、創建任務,管理任務,執行任務等.
celery框架有2種使用方式
- 一種是單獨一個項目目錄,
- 另一種就是Celery集成到web項目框架中。
8、Celery 使用示例
8.1 Celery作為一個單獨項目運行
例如,mycelery代碼目錄直接放在項目根目錄下即可,路徑如下:
服務端項目根目錄/
└── mycelery/
├── settings.py # 配置文件
├── __init__.py
├── main.py # 入口程式
└── sms/ # 非同步任務目錄,這裡拿發送簡訊來舉例,一個類型的任務就一個目錄
└── tasks.py # 任務的文件,文件名必須是tasks.py!!!每一個任務就是一個被裝飾的函數,寫在任務文件中
main.py,代碼:
from celery import Celery
# 實例化celery應用,參數一般為項目應用名
app = Celery("blog")
# 通過app實例對象載入配置文件
app.config_from_object("mycelery.settings")
# 註冊任務, 自動搜索並載入任務
# 參數必須必須是一個列表,裡面的每一個任務都是任務的路徑名稱
# app.autodiscover_tasks(["任務1","任務2",....])
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])
# 啟動Celery的終端命令
# 強烈建議切換目錄到項目的根目錄下啟動celery!!
# celery -A mycelery.main worker --loglevel=info
配置文件settings.py,代碼:
# 任務隊列的鏈接地址
broker_url = 'redis://127.0.0.1:6379/14'
# 結果隊列的鏈接地址
result_backend = 'redis://127.0.0.1:6379/15'
關於配置信息的官方文檔:https://docs.celeryproject.org/en/master/userguide/configuration.html
創建任務文件sms/tasks.py,任務文件名必須固定為"tasks.py",並創建任務,代碼:
from ..main import app
from utils.tencent_sms import send_message
"""
# 如果不添加name參數,啟動celery後的任務名稱就會很長
[tasks]
. mycelery.sms.tasks.send_sms1
. mycelery.sms.tasks.send_sms3
. send_sms2
. send_sms4
"""
@app.task(name="send_sms1")
def send_sms1():
"""沒有任務參數,沒有返回結果的非同步任務"""
print('任務:send_sms1執行了。。。')
# s2 = send.sms2.delay(15150404640,4526)
@app.task(name="send_sms2")
def send_sms2(mobile, code):
"""有任務參數,沒有返回結果的非同步任務"""
print(f'任務:send_sms2執行了。。。{mobile},code={code}')
# s3 = send.sms3.delay()
# s3.result = 100
@app.task(name="send_sms3")
def send_sms3():
"""沒有任務參數,有返回結果的非同步任務"""
print('任務:send_sms3執行了。。。')
return 100
# s4 = send.sms4.delay(10,15)
# s4.result = 25
@app.task(name="send_sms4")
def send_sms4(x, y):
"""有參數,有結果的非同步任務"""
print('任務:send_sms4執行了...')
return x + y
@app.task(name='send_sms')
def send_sms(mobile, random_code):
"""發送簡訊"""
return send_message(mobile, random_code)
接下來,我們運行celery。
cd ~/Desktop/blog/blogapi
# 普通的運行方式[預設多進程,卡終端,按CPU核數+1創建進程數]
# ps aux|grep celery
celery -A mycelery.main worker --loglevel=info
# 啟動多工作進程,以守護進程的模式運行[一個工作進程就是4個子進程]
# 註意:pidfile和logfile必須以絕對路徑來聲明
celery multi start worker -A mycelery.main -E --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log" -l info -n worker1
celery multi start worker -A mycelery.main -E --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker2.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log" -l info -n worker2
# 關閉運行的工作進程
celery multi stop worker -A mycelery.main --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log"
celery multi stop worker -A mycelery.main --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker2.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log"
效果如下:
調用上面的非同步任務,拿django的shell進行舉例:
# 因為celery模塊安裝在了虛擬環境中,所以要確保進入虛擬環境
conda activate base
cd /home/xuejian/Desktop/blog/blog
python manage.py shell
# 調用celery執行非同步任務
from mycelery.sms.tasks import send_sms1,send_sms2,send_sms3,send_sms4
mobile = "13312345656"
code = "666666"
# delay 表示馬上按順序來執行非同步任務,在celrey的worker工作進程有空閑的就立刻執行
# 可以通過delay非同步調用任務,可以沒有參數
ret1 = send_sms1.delay()
# 可以通過delay傳遞非同步任務的參數,可以按位置傳遞參數,也可以使用命名參數
# ret2 = send_sms.delay(mobile=mobile,code=code)
ret2 = send_sms2.delay(mobile,code)
# apply_async 讓任務在後面指定時間後執行,時間單位:秒/s
# 任務名.apply_async(args=(參數1,參數2), countdown=定時時間)
ret4 = send_sms4.apply_async(kwargs={"x":10,"y":20},countdown=30)
# 根據返回結果,不管delay,還是apply_async的返回結果都一樣的。
ret4.id # 返回一個UUID格式的任務唯一標誌符,78fb827e-66f0-40fb-a81e-5faa4dbb3505
ret4.status # 查看當前任務的狀態 SUCCESS表示成功! PENDING任務等待
ret4.get() # 獲取任務執行的結果[如果任務函數中沒有return,則沒有結果,如果結果沒有出現則會導致阻塞]
if ret4.status == "SUCCESS":
print(ret4.get())
接下來,我們讓celery可以調度第三方框架的代碼,這裡拿django當成一個第三模塊調用進行舉例。
在main.py主程式中對django進行導包引入,並設置django的配置文件進行django的初始化。
import os,django
from celery import Celery
# 初始化django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blog.settings')
django.setup()
# 初始化celery對象
app = Celery("blog")
# 載入配置
app.config_from_object("mycelery.config")
# 自動註冊任務
app.autodiscover_tasks(["mycelery.sms","mycelery.email"])
# 運行celery
# 終端下: celery -A mycelery.main worker -l info
在需要使用django配置的任務中,直接載入配置,所以我們把註冊的簡訊發送功能,整合成一個任務函數,mycelery.sms.tasks,代碼:
from ..main import app
from ronglianyunapi import send_sms as send_sms_to_user
@app.task(name="send_sms1")
def send_sms1():
"""沒有任何參數,沒有返回結果的非同步任務"""
print('任務:send_sms1執行了...')
@app.task(name="send_sms2")
def send_sms2(mobile, code):
"""有參數,沒有返回結果的非同步任務"""
print(f'任務:send_sms2執行了...mobile={mobile}, code={code}')
@app.task(name="send_sms3")
def send_sms3():
"""沒有任何參數,有返回結果的非同步任務"""
print('任務:send_sms3執行了...')
return 100
@app.task(name="send_sms4")
def send_sms4(x,y):
"""有結果的非同步任務"""
print('任務:send_sms4執行了...')
return x+y
@app.task(name="send_sms")
def send_sms(tid, mobile, datas):
"""發送簡訊"""
print("發送簡訊")
return send_sms_to_user(tid, mobile, datas)
最終在django的視圖裡面,我們調用Celery來非同步執行任務。
只需要完成2個步驟,分別是導入非同步任務和調用非同步任務。users/views.py,代碼:
import random
from django_redis import get_redis_connection
from django.conf import settings
# from ronglianyunapi import send_sms
from mycelery.sms.tasks import send_sms
"""
/users/sms/(?P<mobile>1[3-9]\d{9})
"""
class SMSAPIView(APIView):
"""
SMS簡訊介面視圖
"""
def get(self, request, mobile):
"""發送簡訊驗證碼"""
redis = get_redis_connection("sms_code")
# 判斷手機簡訊是否處於發送冷卻中[60秒只能發送一條]
interval = redis.ttl(f"interval_{mobile}") # 通過ttl方法可以獲取保存在redis中的變數的剩餘有效期
if interval != -2:
return Response({"errmsg": f"簡訊發送過於頻繁,請{interval}秒後再次點擊獲取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)
# 基於隨機數生成簡訊驗證碼
# code = "%06d" % random.randint(0, 999999)
code = f"{random.randint(0, 999999):06d}"
# 獲取簡訊有效期的時間
time = settings.RONGLIANYUN.get("sms_expire")
# 簡訊發送間隔時間
sms_interval = settings.RONGLIANYUN["sms_interval"]
# 調用第三方sdk發送簡訊
# send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
# 非同步發送簡訊
send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
# 記錄code到redis中,並以time作為有效期
# 使用redis提供的管道對象pipeline來優化redis的寫入操作[添加/修改/刪除]
pipe = redis.pipeline()
pipe.multi() # 開啟事務
pipe.setex(f"sms_{mobile}", time, code)
pipe.setex(f"interval_{mobile}", sms_interval, "_")
pipe.execute() # 提交事務,同時把暫存在pipeline的數據一次性提交給redis
return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)
上面就是使用celery並執行非同步任務的第一種方式,適合在一些無法直接集成celery到項目中的場景。
8.2 Celery作為第三方模塊集成到項目中
這裡還是拿django來舉例,目錄結構調整如下:
blogapi/ # 服務端項目根目錄
└── blogapi/ # 主應用目錄
├── apps/ # 子應用存儲目錄
├ └── users/ # django的子應用
├ └── tasks.py # [新增]分散在各個子應用下的非同步任務模塊
├── settings/ # [修改]django的配置文件存儲目錄[celery的配置信息填寫在django配置中即可]
├── __init__.py # [修改]設置當前包目錄下允許外界調用celery應用實例對象
└── celery.py # [新增]celery入口程式,相當於上一種用法的main.py
blog/celery.py,主應用目錄下創建cerley入口程式,創建celery對象並載入配置和非同步任務,代碼:
import os
from celery import Celery
# 必須在實例化celery應用對象之前執行
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'blog.settings')
# 實例化celery應用對象
app = Celery('blog')
# 指定任務的隊列名稱
app.conf.task_default_queue = 'Celery'
# 也可以把配置寫在django的項目配置中
app.config_from_object('django.conf:settings', namespace='CELERY') # 設置django中配置信息以 "CELERY_"開頭為celery的配置信息
# 自動根據配置查找django的所有子應用下的tasks任務文件
app.autodiscover_tasks()
settings/dev.py,django配置中新增celery相關配置信息,代碼:
# Celery非同步任務隊列框架的配置項[註意:django的配置項必須大寫,所以這裡的所有配置項必須全部大寫]
# 任務隊列
CELERY_BROKER_URL = 'redis://:[email protected]:6379/14'
# 結果隊列
CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/15'
# 時區,與django的時區同步
CELERY_TIMEZONE = TIME_ZONE
# 防止死鎖
CELERY_FORCE_EXECV = True
# 設置併發的worker數量
CELERYD_CONCURRENCY = 200
# 設置失敗允許重試[這個慎用,如果失敗任務無法再次執行成功,會產生指數級別的失敗記錄]
CELERY_ACKS_LATE = True
# 每個worker工作進程最多執行500個任務被銷毀,可以防止記憶體泄漏,500是舉例,根據自己的伺服器的性能可以調整數值
CELERYD_MAX_TASKS_PER_CHILD = 500
# 單個任務的最大運行時間,超時會被殺死[慎用,有大文件操作、長時間上傳、下載任務時,需要關閉這個選項,或者設置更長時間]
CELERYD_TIME_LIMIT = 10 * 60
# 任務發出後,經過一段時間還未收到acknowledge, 就將任務重新交給其他worker執行
CELERY_DISABLE_RATE_LIMITS = True
# celery的任務結果內容格式
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
# 之前定時任務(定時一次調用),使用了apply_async({}, countdown=30);
# 設置定時任務(定時多次調用)的調用列表,需要單獨運行SCHEDULE命令才能讓celery執行定時任務:celery -A mycelery.main beat,當然worker還是要啟動的
# https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"user-add": { # 定時任務的註冊標記符[必須唯一的]
"task": "add", # 定時任務的任務名稱
"schedule": 10, # 定時任務的調用時間,10表示每隔10秒調用一次add任務
# "schedule": crontab(hour=7, minute=30, day_of_week=1),, # 定時任務的調用時間,每周一早上7點30分調用一次add任務
}
}
blog/__init__.py
,主應用下初始化,代碼:
import pymysql
from .celery import app as celery_app
pymysql.install_as_MySQLdb()
__all__ = ['celery_app']
users/tasks.py,代碼:
from celery import shared_task
from ronglianyunapi import send_sms as sms
# 記錄日誌:
import logging
logger = logging.getLogger("django")
@shared_task(name="send_sms")
def send_sms(tid, mobile, datas):
"""非同步發送簡訊"""
try:
return sms(tid, mobile, datas)
except Exception as e:
logger.error(f"手機號:{mobile},發送簡訊失敗錯誤: {e}")
@shared_task(name="send_sms1")
def send_sms1():
print("send_sms1執行了!!!")
django中的用戶發送簡訊,就可以改成非同步發送簡訊了。
users/views,視圖中調用非同步發送簡訊的任務,代碼:
from .tasks import send_sms
send_sms.delay(settings.RONGLIANYUN.get("reg_tid"),mobile, datas=(code, time // 60))
users/views.py,非同步發送信息的完整視圖,代碼:
import random
from django_redis import get_redis_connection
from django.conf import settings
# from ronglianyunapi import send_sms
# from mycelery.sms.tasks import send_sms
from .tasks import send_sms
"""
/users/sms/(?P<mobile>1[3-9]\d{9})
"""
class SMSAPIView(APIView):
"""
SMS簡訊介面視圖
"""
def get(self, request, mobile):
"""發送簡訊驗證碼"""
redis = get_redis_connection("sms_code")
# 判斷手機簡訊是否處於發送冷卻中[60秒只能發送一條]
interval = redis.ttl(f"interval_{mobile}") # 通過ttl方法可以獲取保存在redis中的變數的剩餘有效期
if interval != -2:
return Response({"errmsg": f"簡訊發送過於頻繁,請{interval}秒後再次點擊獲取!", "interval": interval},status=status.HTTP_400_BAD_REQUEST)
# 基於隨機數生成簡訊驗證碼
# code = "%06d" % random.randint(0, 999999)
code = f"{random.randint(0, 999999):06d}"
# 獲取簡訊有效期的時間
time = settings.RONGLIANYUN.get("sms_expire")
# 簡訊發送間隔時間
sms_interval = settings.RONGLIANYUN["sms_interval"]
# 調用第三方sdk發送簡訊
# send_sms(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
# 非同步發送簡訊
send_sms.delay(settings.RONGLIANYUN.get("reg_tid"), mobile, datas=(code, time // 60))
# 記錄code到redis中,並以time作為有效期
# 使用redis提供的管道對象pipeline來優化redis的寫入操作[添加/修改/刪除]
pipe = redis.pipeline()
pipe.multi() # 開啟事務
pipe.setex(f"sms_{mobile}", time, code)
pipe.setex(f"interval_{mobile}", sms_interval, "_")
pipe.execute() # 提交事務,同時把暫存在pipeline的數據一次性提交給redis
return Response({"errmsg": "OK"}, status=status.HTTP_200_OK)
終端下先啟動celery,在django項目根目錄下啟動。
cd ~/Desktop/blog/blogapi
# 1. 普通運行模式,關閉終端以後,celery就會停止運行
celery -A blogapi worker -l INFO
# 2. 啟動多worker進程模式,以守護進程的方式運行,不需要在意終端。但是這種運行模型,一旦停止,需要手動啟動。
celery multi start worker -A blogapi -E --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid" --logfile="/home/moluo/Desktop/blog/blogapi/logs/celery.log" -l info -n worker1
# 3. 啟動多worker進程模式
celery multi stop worker -A blogapi --pidfile="/home/moluo/Desktop/blog/blogapi/logs/worker1.pid"
還是可以在django終端下調用celery的
$ python manage.py shell
>>> from users.tasks import send_sms1
>>> res = send_sms1.delay()
>>> res = send_sms1.apply_async(countdown=15)
>>> res.id
'893c31ab-e32f-44ee-a321-8b07e9483063'
>>> res.state
'SUCCESS'
>>> res.result
關於celery中非同步任務發佈的2個方法的參數如下:
非同步任務名.delay(*arg, **kwargs)
非同步任務名.apply_async((arg,), {'kwarg': value}, countdown=60, expires=120)
定時任務的調用器啟動,可以在運行了worker以後,使用以下命令:
cd ~/Desktop/blog/blogapi
celery -A blogapi beat
beat調度器關閉了,則定時任務無法執行,如果worker工作進程關閉了,則celery關閉,保存在消息隊列中的任務就會囤積在那裡。
celery還有一些高階用法, 我們後面用到再提。
celery後面還可以使用supervisor進行後臺托管運行。還可以針對任務執行的情況和結果,使用flower來進行監控。celery失敗任務的重新嘗試執行。
supervisor會在celery以外關閉了以後,自動重啟celery。
本文來自博客園,作者:暮歌行,轉載請註明原文鏈接:https://www.cnblogs.com/xuejian123/p/17199559.html