Celery介紹、安裝、基本使用 一、Celery服務 什麼是Celery: Celery是一個簡單、靈活且可靠的,處理消息的分散式系統 Celery可以用來做什麼: 非同步任務 定時任務 延遲任務 Celery的運行原理: 可以不依賴任何服務,通過自身命令,啟動服務 celery服務為其他項目服務提 ...
目錄
Celery介紹、安裝、基本使用
一、Celery服務
什麼是Celery:
Celery是一個簡單、靈活且可靠的,處理消息的分散式系統
- Celery可以用來做什麼:
- 非同步任務
- 定時任務
- 延遲任務
Celery的運行原理:
- 可以不依賴任何服務,通過自身命令,啟動服務
- celery服務為其他項目服務提供非同步解決任務需求
# 註:會有兩個服務同時運行
- 項目服務
- celery服務
項目服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成項目的需求
'''
人是一個獨立運行的服務 | 醫院也是一個獨立運行的服務
正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立運行,人生病時,醫院就來解決人生病的需求
'''
1、celery架構
-
消息中間件:broker
- 提交的任務【函數】都放在這裡, celery本身不能提供消息中間件
- 需要藉助於第三方: redis或rabbitmq
-
任務執行單元:worker
- 真正執行任務的的地方,一個個進程中執行函數
-
結果儲存:backend
- 函數return的結果都存儲在這裡, celery本身不提供結果存儲
- 需要藉助於第三方: redis或rabbitmq
使用場景:
- 非同步執行:解決耗時任務
- 延遲執行:解決延遲任務
- 定時執行:解決周期任務
2、celery快速使用
Celery不支持在windows上直接運行,通過eventlet支持在win上運行
安裝:
pip install celery
pip install eventlet # windows需要安裝
快速使用:
- 1、第一步:創建一個py文件(main.py),用於實例化celery對象,編寫需要執行的函數
# 1、導入模塊
from celery import Celery
# 2、指定briker,用於存放提交的非同步任務
broker = 'redis://127.0.0.1:6379/1'
# 3、指定backend,用於存放函數執行結束的結果
backend = 'redis://127.0.0.1:6379/2'
# 實例化celery對象
app = Celery('test', broker=broker, backend=backend)
# 編寫一個函數,裝飾上celery對象
@app.task
def add(a, b):
import time
time.sleep(3)
print('add函數執行完成')
return a + b
- 2、第二步:再次創建一個py文件(run.py),用於將函數提交給celery
# 1、導入剛纔編寫的函數
from main import add
# 2、將任務提交給broker,函數需要的參數需要傳入
res = add.delay(1, 2)
# 3、提交後可以獲得該任務的ID,可通過ID可以查詢任務執行結果
print(res) # 0213d2c2-453e-41a8-a171-e31f1f2f4883
- 3、第三步:使用命令開啟worker (也可以提前開啟,任務提交後就會直接執行)
# 啟動worker命令,win需要安裝eventlet
# 啟動需要進入main.py文件的目錄下
win:
-4.x之前版本
celery worker -A main -l info -P eventlet
-4.x之後
celery -A main worker -l info -P eventlet
mac:
celery -A main worker -l info
- 4、第四步:worker會將執行的結果存在之前指定的broker目錄下(指定的redis資料庫)
- 5、第五步:通過代碼查看執行結果(創建新的py文件,專門用於查看執行結果)
# 1、導入celery實例的對象
from main import app
# 2、導入該模塊用於查看結果
from celery.result import AsyncResult
# 3、將提交的任務編號拿過來,用於查詢結果
id = '0213d2c2-453e-41a8-a171-e31f1f2f4883'
# 4、指定該文件為啟動文件
if __name__ == '__main__':
# 實例化對象,將任務的ID和celery實例化對象當作參數傳入
a = AsyncResult(id=id, app=app)
# 判斷執行結果
if a.successful(): # 執行完了
result = a.get()
print(result)
elif a.failed():
print('任務失敗')
elif a.status == 'PENDING':
print('任務等待中被執行')
elif a.status == 'RETRY':
print('任務異常後正在重試')
elif a.status == 'STARTED':
print('任務已經開始被執行')
二、Celer包結構
1、創建clery包結構
什麼是包結構:通過將celery服務封裝成包的形式,放在項目需要使用的時候導入即可
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery連接和配置相關文件,且名字必須交celery.py
│ └── tasks.py # 所有任務函數
├── add_task.py # 添加任務
└── get_result.py # 獲取結果
創建包:
創建一個包,名為:celery_task
- 1、第一步:在包下創建py文件(名字必須為celery.py)
# 導入celery模塊
from celery import Celery
# 導入配置broker和backend
from .settings import BACKEND, BROKER
# 實例化celery對象
app = Celery('test',
broker=BROKER,
backend=BACKEND,
include=['celery_task.order_task',
'celery_task.user_task'])
- 2、第二步:創建settings.py,用於存放配置
BROKER = 'redis://127.0.0.1:6379/1'
BACKEND = 'redis://127.0.0.1:6379/2'
- 3、第三步,創建py文件(task.py),用於存放需要執行的非同步任務
# 導入celery實例對象
from .celery import app
# 計算函數
@app.task()
def add(a, b):
print('計算結果為:', a + b)
return True
# 模擬發送簡訊
@app.task()
def send_sms(mobile, code):
print('已向手機號:%s 發送簡訊,驗證碼為:%s' % (mobile, code))
return True
- 4、第四步:開啟worker
切換到celery所在的目錄下,開啟worker命令
celery -A celery_task worker -l info -P eventlet
- 5、第五步:提橋任務: # add_task.py 文件下
# 提交任務,這裡模擬的是非同步任務的提交
res = add.delay(a, b) # 提交後可以接收任務的ID
res1 = send_sms.delay(mobile, code)
- 6、第六步:查看任務執行結果: # get_result.py 文件下
# 導入celery實例
from celery_task.celery import app
from celery.result import AsyncResult
id = res
id1 = res1
# 通過傳入任務的ID就可以查詢到任務的執行結果
def res_func(id):
id = id
a = AsyncResult(id=id, app=app)
if a.successful(): # 執行完了
result = a.get()
if result: return '執行完成'
elif a.failed():
return '任務失敗,失敗的原因可能是未開啟worker'
elif a.status == 'PENDING':
return '任務等待中被執行,當前任務較多或未開啟worker'
elif a.status == 'RETRY':
return '任務異常後正在重試'
elif a.status == 'STARTED':
return '任務已經開始被執行,請稍後查詢'
2、Celery執行非同步任務、延遲任務、定時任務
執行非同步任務:
# 代碼用法:
函數名.delay('函數執行需要的參數')
res = func.delay(*args,**kwargs) # res 用於接收提交任務的ID
執行延遲任務:
# 代碼用法:
# 1、執行延遲任務
from datetime import datetime, timedelta
# 設置延遲後的時間,一分鐘後執行
eat = datetime.utcnow() + timedelta(minutes=1)
# 提交任務
res = send_sms.apply_async(args=['13855411111', '123'], eta=eta)
執行定時任務:
執行定時任務需要啟動beat和worker
- beat:定時提交任務的進程---》配置在app.conf.beat_schedule的任務
- worker:執行任務
- 第一步:在celery的py文件中寫入
# 導入定時需要的模塊
from celery.schedules import crontab
# 第一步:在celery的py文件中寫入
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# celery的配置文件#####
# 任務的定時配置
app.conf.beat_schedule = {
'send_sms': { # 配置執行函數的名字
'task': 'celery_task.task.send_sms', # 導入任務的位置
# 'schedule': timedelta(seconds=3), # 時間對象
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八點
'schedule': crontab(hour=9, minute=43), # 每天9點43
'args': ('18888888', '6666'), # 配置執行函數需要的參數
},
}
- 第二步:啟動beat # 啟動後配配置的任務會自動提交
celery -A celery_task beat -l info
- 第三步:啟動worker # beat提交的任務被被執行
celery -A celery_task worker -l info -P eventlet
三、Django中使用celery
補充:
如果在公司中,只做定時任務有一個框架更簡單一點
使用步驟:
-1 把咱們寫的包,複製到項目目錄下
-luffy_api
-celery_task #celery的包路徑
-luffy_api #源代碼路徑
-2 在使用提交非同步任務的位置,導入使用即可
-視圖函數中使用,導入任務
-任務.delay() # 提交任務
-3 啟動worker,如果有定時任務,啟動beat
-4 等待任務被worker執行
-5 在視圖函數中,查詢任務執行的結果
1、模擬寫一個非同步秒殺任務
後端
view.py
from celery.result import AsyncResult
from celery_task.celery import app
from celery_task.task import sckill_task
# 秒殺介面
class SeckillView(ViewSet):
# 開啟秒殺
@action(methods=['GET'], detail=False)
def seckill(self, request):
# 獲取商品鏈接
goods_id = request.query_params.get('goods_id')
# 將任務提交給worker
res = sckill_task.delay(goods_id)
# 將任務的ID反饋給前端
return APIResponse(task_id=str(res))
# 查詢秒殺結果
@action(methods=['GET'], detail=False)
def get_result(self, request):
# 前端將任務ID產過來,用於接收結果
task_id = request.query_params.get('task_id')
# 調用介面,查詢結果
a = AsyncResult(id=task_id, app=app)
if a.successful():
result = a.get()
if result:
return APIResponse(msg='秒殺成功')
else:
return APIResponse(code=101, msg='手速滿了,秒殺失敗')
elif a.status == 'PENDING':
return APIResponse(code=666, msg='加速秒殺中')
return APIResponse(msg='錯誤')
celery.py ---->秒殺任務
import random
# 秒殺函數
@app.task()
def sckill_task(goods_id):
print('商品正在秒殺中')
time.sleep(random.choice([6, 7, 8, 9]))
print('商品秒殺結束')
return random.choice([True, False])
前端:
<template>
<div>
<button @click="clickHandle">點擊秒殺</button>
</div>
</template>
<script>
export default {
name: "Template",
data() {
return {
// 用於接收任務ID
task_id: '',
// 用戶存放定時任務
t: ''
}
},
methods: {
// 用戶點擊秒殺後發送請求
clickHandle() {
// 向厚點提交秒殺任務
this.$axios.get(this.$settings.BASE_URL + '/user/seckill/seckill/?goods_id=1').then(res => {
// 判斷任務是否提交成功
if (res.data.code == 100) {
// 提交成功會獲取到任務ID
this.task_id = res.data.task_id
// 告知用戶商品正在秒殺中
this.$message('正在秒殺中')
// 啟動一個定時任務,每隔3秒向後端發送請求,獲取任務是否提交成功
this.t = setInterval(res => {
// 定時向後端發送請求,判斷秒殺結果
this.$axios.get(this.$settings.BASE_URL + '/user/seckill/get_result/?task_id=' + this.task_id).then(res => {
// 判斷任務是否結束
if (res.data.code == 666) {
this.$message(res.data.msg)
// 任務結束反饋結果,關閉定時器
} else {
this.$message(res.data.msg)
// 關閉定時器
clearInterval(this.t)
this.t = ''
}
})
}, 3000)
}
})
}
}
}
</script>
2、總結
- 第一步:將celery包複製到項目路徑下
-luffy_api
-celery_task #celery的包路徑
celery.py # 一定不要忘了一句話
import os
# 重點:celery中使用djagno,任務中可能會使用django的orm,緩存,表模型。。。。一定要加
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
-luffy_api #源代碼路徑
- 第二步:在需要使用非同步的地方導入celery實例即可使用
-視圖函數中使用,導入任務
-任務.delay() # 提交任務
-
第三步:啟動worker,如果有定時任務,啟動beat
-
第四步: 等待任務被worker執行
-
第五步:在視圖函數中,查詢任務結果