Django使用Celery非同步任務隊列

来源:https://www.cnblogs.com/StitchSun/archive/2018/03/12/8552488.html
-Advertisement-
Play Games

1 Celery簡介 Celery是非同步任務隊列,可以獨立於主進程運行,在主進程退出後,也不影響隊列中的任務執行。 任務執行異常退出,重新啟動後,會繼續執行隊列中的其他任務,同時可以緩存停止期間接收的工作任務,這個功能依賴於消息隊列(MQ、Redis)。 1.1 Celery原理 Celery的架構 ...


1  Celery簡介

Celery是非同步任務隊列,可以獨立於主進程運行,在主進程退出後,也不影響隊列中的任務執行。

任務執行異常退出,重新啟動後,會繼續執行隊列中的其他任務,同時可以緩存停止期間接收的工作任務,這個功能依賴於消息隊列(MQ、Redis)。

 

1.1  Celery原理

 

 

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

  • 消息中間件:Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。推薦使用:RabbitMQ、Redis作為消息隊列。
  • 任務執行單元:Worker是Celery提供的任務執行的單元,worker併發的運行在分散式的系統節點中。
  • 任務結果存儲:Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

1.2     Celery適用場景

  • 非同步任務處理:例如給註冊用戶發送短消息或者確認郵件任務。
  • 大型任務:執行時間較長的任務,例如視頻和圖片處理,添加水印和轉碼等,需要執行任務時間長。
  • 定時執行的任務:支持任務的定時執行和設定時間執行。例如性能壓測定時執行。

 

2      Celery開發環境準備

2.1     環境準備

軟體名稱

版本號

說明

Linux

Centos 6.5(64bit)

操作系統

Python

3.5.2

 

Django

1.10

Web框架

Celery

4.0.2

非同步任務隊列

Redis

2.4

消息隊列

 

2.2     Celery安裝

使用方法介紹:

Celery的運行依賴消息隊列,使用時需要安裝redis或者rabbit。

這裡我們使用Redis。安裝redis庫:

sudo yum install redis

  

啟動redis:

sudo service redis start

 

安裝celery庫

sudo pip install celery==4.0.2

 

3      Celery單獨執行任務

3.1     編寫任務

創建task.py文件

說明:這裡初始Celery實例時就載入了配置,使用的redis作為消息隊列和存儲任務結果。

 

 

運行celery:

$ celery -A task worker --loglevel=info

看到下麵的列印,說明celery成功運行。

 

3.2     調用任務

 直接打開python交互命令行

 執行下麵代碼:

 

可以celery的視窗看到任務的執行信息

 

 

任務執行狀態監控和獲取結果:

 

 

 

3.3     任務調用方法總結

有兩種方法:

delay和apply_async ,delay方法是apply_async簡化版。

add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri')

 

delay方法是apply_async簡化版本。

apply_async方法是可以帶非常多的配置參數,包括指定隊列等

  • Queue 指定隊列名稱,可以把不同任務分配到不同的隊列

 

3.4     任務狀態

每個任務有三種狀態:

PENDING -> STARTED -> SUCCESS

 

任務查詢狀態:

res.state

 

來查詢任務的狀態

 

 

4      與Django集成

上面簡單介紹了celery非同步任務的基本方法,結合我們實際的應用,我們需要與Django一起使用,下麵介紹如何與Django結合。

4.1     與Django集成方法

與Django集成有兩種方法:

  • Django 1.8 以上版本:與Celery 4.0版本集成
  • Django 1.8 以下版本:與Celery3.1版本集成,使用django-celery庫

 

今天我們介紹celery4.0 和django 1.8以上版本集成方法。

4.2     創建項目文件

創建一個項目:名字叫做proj

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
  - proj/wsgi.py
- manage.py

 

創建一個新的文件:proj/proj/mycelery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
 
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
 
app = Celery('proj')
 
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
 
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

 

在proj/proj/__init__.py:添加

from __future__ import absolute_import, unicode_literals
 
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .mycelery import app as celery_app
 
__all__ = ['celery_app']

 

4.3     配置Celery

我們在mycelery.py文件中說明celery的配置文件在settings.py中,並且是以CELERY開頭。

   
app.config_from_object('django.conf:settings', namespace='CELERY')

 

在settings.py文件中添加celery配置:

 

 

我們的配置是使用redis作為消息隊列,消息的代理和結果都是用redis,任務的序列化使用json格式。

重要:redis://127.0.0.1:6379/0這個說明使用的redis的0號隊列,如果有多個celery任務都使用同一個隊列,則會造成任務混亂。最好是celery實例單獨使用一個隊列。

4.4     創建APP

創建Django的App,名稱為celery_task,在app目錄下創建tasks.py文件。

完成後目錄結構為:

├── celery_task
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── db.sqlite3
├── manage.py
├── proj
│   ├── celery.py
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

 

4.5     編寫task任務

編輯任務文件

tasks.py

在tasks.py文件中添加下麵代碼

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)

 

啟動celery:

celery -A proj.mycelery worker -l info

 

說明:proj為模塊名稱,mycelery為celery的實例所在的文件。

啟動成功列印:

 

 

4.6     在views中調用任務

在views中編寫介面,實現兩個功能:

  • 觸發任務,然後返回任務的結果和任務ID
  • 根據任務ID查詢任務狀態

代碼如下:

 

 

啟動django。

新開一個會話啟動celery;啟動命令為:

celery –A proj.mycelery worker –l info

 

訪問http://127.0.0.1:8000/add,可以看到返回的結果。

 

 

在celery運行的頁面,可以看到下麵輸出:

 

 

4.7     在views中查詢任務狀態

有的時候任務執行時間較長,需要查詢任務是否執行完成,可以根據任務的id來查詢任務狀態,根據狀態進行下一步操作。

可以看到任務的狀態為:SUCCESS

 

 

5      Celery定時任務

Celery作為非同步任務隊列,我們可以按照我們設置的時間,定時的執行一些任務,例如每日資料庫備份,日誌轉存等。

Celery的定時任務配置非常簡單:

定時任務的配置依然在setting.py文件中。

說明:如果覺得celery的數據配置文件和Django的都在setting.py一個文件中不方便,可以分拆出來,只需要在mycelery.py的文件中指明即可。

app.config_from_object('django.conf:yoursettingsfile', namespace='CELERY')

 

 

5.1     任務間隔運行

#每30秒調用task.add
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
}

 

5.2     定時執行

定時每天早上7:30分運行。

註意:設置任務時間時註意時間格式,UTC時間或者本地時間。

#crontab任務
#每天7:30調用task.add
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30),
        'args': (16, 16),
    },
}

 

5.3     定時任務啟動

配置了定時任務,除了worker進程外,還需要啟動一個beat進程。

Beat進程的作用就相當於一個定時任務,根據配置來執行對應的任務。

5.3.1  啟動beat進程

命令如下:

celery -A proj.mycelery beat -l info

 

 

5.3.2  啟動worker進程

Worker進程啟動和前面啟動命令一樣。

 

celery –A proj.mycelery worker –l info

 

6      Celery深入

Celery任務支持多樣的運行模式:

  • 支持動態指定併發數 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
  • 支持鏈式任務
  • 支持Group任務
  • 支持任務不同優先順序
  • 支持指定任務隊列
  • 支持使用eventlet模式運行worker

例如:指定併發數為1000

celery -A proj.mycelery worker -c 1000

 

這些可以根據使用的深入自行瞭解和學習。

 

 

7      參考資料

Celery官網:

http://docs.celeryproject.org/en/latest/index.html

Celery與Django:

http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#next-steps

celery定時任務:

http://blog.csdn.net/sicofield/article/details/50937338


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • num= 1 #值 =1while num <= 10 : # num(1)小於10 print(num) # 應該列印 這個1的值 num +=1 # num+=1等價於 num再加1 所以這個值一共是2 if num == 6: #如果為6 這個程式終止,如果沒有到6,那麼程式將會一遍一遍的走直 ...
  • 一、前言 1.1.環境 python版本:3.6 Django版本:1.11.6 1.2.預覽效果 最終搭建的blog的樣子,基本上滿足需求了。框架搭好了,至於CSS,可以根據自己喜好隨意搭配。 二、建立博客應用 2.1.建立項目和應用 創建工程blogproject 創建blog應用 打開 blo ...
  • 最近在寫一個項目的時候需要用到MonogoDB,存儲經緯度坐標的(貌似MongoBD乾這個比較專業),由於沒有玩過MongoBD,就跟著教程來整合這個東西,用的是SpringBoot來整合SpringData和MongoDB,大概是由於版本等原因 教程里是這樣寫的,完全沒毛病。 但是自己寫的時候就出 ...
  • 字典(dictionary) 字典是另一種可變容器模型,且可存儲任意類型對象。 字典的每個鍵值 key= value 對用冒號:分割,每個鍵值對之間用逗號,分割,整個字典包括在花括弧 {} 中 ,格式如下所示: 訪問字典里的值 字典遍歷 第一種方法:key遍歷 第二種方法:元素遍歷 判斷key是否存 ...
  • 序列 序列:數學上,序列是被排成一列的對象(或事件);這樣,每個元素不是在器他元素之前,就是在其他元素之後。這裡元素之間的順序非常重要。《維基百科》 序列是Python中最基本的數據結構。序列中的每個元素都分配一個數字 - 它的位置,或索引,第一個索引是0,第二個索引是1,依此類推。 Python有 ...
  • Description 已知va和vb分別為非遞減有序線性表,將va和vb進行合併為新的線性表vc,並保持vc仍然非遞減有序。 本題中,線性表元素為整數。線性表的最大長度為1000。 Input 輸入數據有多組,第一行為測試數據的組數t,接下來為2t行,每一組測試數據有兩行: 第一行的第一個數為va ...
  • 編寫第一個Java程式 完成工作:1.在文本編輯器中輸入一個Java程式。 2.使用括弧組織程式。 3.保存、編譯和運行程式。 1 package com.Jsample;//將程式的包名稱命名為com.Jsample 2 3 public class Helloworld {//將程式(類)命名為 ...
  • 列表函數 追加和擴展 list.append() 在列表末尾追加新的對象 extend()在列表末尾一次性追加另一個序列中的多個值(用新列表擴展原來的列表) 其他函數 count() 統計某個元素在列表中出現的次數 index() 從列表中找出某個值第一個匹配項的索引位置 insert() 將對象插 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...