Celery框架從入門到精通

来源:https://www.cnblogs.com/kangssssh/archive/2023/03/09/17201349.html
-Advertisement-
Play Games

Celery介紹、安裝、基本使用 一、Celery服務 什麼是Celery: Celery是一個簡單、靈活且可靠的,處理消息的分散式系統 Celery可以用來做什麼: 非同步任務 定時任務 延遲任務 Celery的運行原理: 可以不依賴任何服務,通過自身命令,啟動服務 celery服務為其他項目服務提 ...


目錄

Celery介紹、安裝、基本使用

一、Celery服務

什麼是Celery:

Celery是一個簡單、靈活且可靠的,處理消息的分散式系統

  • Celery可以用來做什麼:
    • 非同步任務
    • 定時任務
    • 延遲任務

Celery的運行原理:

  • 可以不依賴任何服務,通過自身命令,啟動服務
  • celery服務為其他項目服務提供非同步解決任務需求
# 註:會有兩個服務同時運行
    - 項目服務
    - celery服務
    項目服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成項目的需求
    
    
'''
人是一個獨立運行的服務 | 醫院也是一個獨立運行的服務
	正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
	人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立運行,人生病時,醫院就來解決人生病的需求

'''

1、celery架構

  • 消息中間件:broker

    • 提交的任務【函數】都放在這裡, celery本身不能提供消息中間件
    • 需要藉助於第三方: redis或rabbitmq
  • 任務執行單元:worker

    • 真正執行任務的的地方,一個個進程中執行函數
  • 結果儲存:backend

    • 函數return的結果都存儲在這裡, celery本身不提供結果存儲
    • 需要藉助於第三方: redis或rabbitmq

使用場景:

  • 非同步執行:解決耗時任務
  • 延遲執行:解決延遲任務
  • 定時執行:解決周期任務

2、celery快速使用

Celery不支持在windows上直接運行,通過eventlet支持在win上運行

安裝:

pip install celery
pip install eventlet  # windows需要安裝 

快速使用:

- 1、第一步:創建一個py文件(main.py),用於實例化celery對象,編寫需要執行的函數
    # 1、導入模塊
    from celery import Celery

    # 2、指定briker,用於存放提交的非同步任務
    broker = 'redis://127.0.0.1:6379/1'
    # 3、指定backend,用於存放函數執行結束的結果
    backend = 'redis://127.0.0.1:6379/2'
    # 實例化celery對象
    app = Celery('test', broker=broker, backend=backend)


    # 編寫一個函數,裝飾上celery對象
    @app.task
    def add(a, b):
        import time
        time.sleep(3)
        print('add函數執行完成')
        return a + b
       
- 2、第二步:再次創建一個py文件(run.py),用於將函數提交給celery
    # 1、導入剛纔編寫的函數
    from main import add

    # 2、將任務提交給broker,函數需要的參數需要傳入
    res = add.delay(1, 2)
    # 3、提交後可以獲得該任務的ID,可通過ID可以查詢任務執行結果
    print(res)  # 0213d2c2-453e-41a8-a171-e31f1f2f4883
    
  
- 3、第三步:使用命令開啟worker (也可以提前開啟,任務提交後就會直接執行)
	# 啟動worker命令,win需要安裝eventlet
	# 啟動需要進入main.py文件的目錄下
	win:
		-4.x之前版本
			celery worker -A main -l info -P eventlet
		-4.x之後
			celery  -A main  worker -l info -P eventlet
 	mac:
		celery  -A main  worker -l info

- 4、第四步:worker會將執行的結果存在之前指定的broker目錄下(指定的redis資料庫)
	
- 5、第五步:通過代碼查看執行結果(創建新的py文件,專門用於查看執行結果)
    # 1、導入celery實例的對象
    from main import app
    # 2、導入該模塊用於查看結果
    from celery.result import AsyncResult

    # 3、將提交的任務編號拿過來,用於查詢結果
    id = '0213d2c2-453e-41a8-a171-e31f1f2f4883'

    # 4、指定該文件為啟動文件
    if __name__ == '__main__':
        # 實例化對象,將任務的ID和celery實例化對象當作參數傳入
        a = AsyncResult(id=id, app=app)
        # 判斷執行結果
        if a.successful():  # 執行完了
            result = a.get()
            print(result)
        elif a.failed():
            print('任務失敗')
        elif a.status == 'PENDING':
            print('任務等待中被執行')
        elif a.status == 'RETRY':
            print('任務異常後正在重試')
        elif a.status == 'STARTED':
            print('任務已經開始被執行')

二、Celer包結構

1、創建clery包結構

什麼是包結構:通過將celery服務封裝成包的形式,放在項目需要使用的時候導入即可

project
    ├── celery_task  	  # celery包
    │   ├── __init__.py  # 包文件
    │   ├── celery.py   # celery連接和配置相關文件,且名字必須交celery.py
    │   └── tasks.py   # 所有任務函數
    ├── add_task.py  	 # 添加任務
    └── get_result.py   # 獲取結果

創建包:

創建一個包,名為:celery_task

- 1、第一步:在包下創建py文件(名字必須為celery.py)
    # 導入celery模塊
    from celery import Celery
    # 導入配置broker和backend
    from .settings import BACKEND, BROKER

    # 實例化celery對象
    app = Celery('test',
                 broker=BROKER, 
                 backend=BACKEND,
                 include=['celery_task.order_task', 
                          'celery_task.user_task'])

- 2、第二步:創建settings.py,用於存放配置
    BROKER = 'redis://127.0.0.1:6379/1'
    BACKEND = 'redis://127.0.0.1:6379/2'
    
- 3、第三步,創建py文件(task.py),用於存放需要執行的非同步任務
    # 導入celery實例對象
    from .celery import app

    # 計算函數
    @app.task()
    def add(a, b):
        print('計算結果為:', a + b)
        return True

    # 模擬發送簡訊
    @app.task()
    def send_sms(mobile, code):
        print('已向手機號:%s 發送簡訊,驗證碼為:%s' % (mobile, code))
        return True
    
- 4、第四步:開啟worker
	切換到celery所在的目錄下,開啟worker命令
	celery -A celery_task worker -l info -P eventlet
	

- 5、第五步:提橋任務: # add_task.py 文件下
    # 提交任務,這裡模擬的是非同步任務的提交
    res = add.delay(a, b)  # 提交後可以接收任務的ID
    res1 = send_sms.delay(mobile, code)
    
                                
- 6、第六步:查看任務執行結果: # get_result.py 文件下
    # 導入celery實例
    from celery_task.celery import app
    from celery.result import AsyncResult
    
	 id = res
    id1 = res1
		
    # 通過傳入任務的ID就可以查詢到任務的執行結果
    def res_func(id):
        id = id
        a = AsyncResult(id=id, app=app)
        if a.successful():  # 執行完了
            result = a.get()
            if result: return '執行完成'
        elif a.failed():
            return '任務失敗,失敗的原因可能是未開啟worker'
        elif a.status == 'PENDING':
            return '任務等待中被執行,當前任務較多或未開啟worker'
        elif a.status == 'RETRY':
            return '任務異常後正在重試'
        elif a.status == 'STARTED':
            return '任務已經開始被執行,請稍後查詢'

2、Celery執行非同步任務、延遲任務、定時任務

執行非同步任務:

# 代碼用法:
	函數名.delay('函數執行需要的參數')
	res = func.delay(*args,**kwargs)   # res 用於接收提交任務的ID

執行延遲任務:

# 代碼用法:
    # 1、執行延遲任務
    from datetime import datetime, timedelta

    # 設置延遲後的時間,一分鐘後執行
    eat = datetime.utcnow() + timedelta(minutes=1)

    # 提交任務
    res = send_sms.apply_async(args=['13855411111', '123'], eta=eta)

執行定時任務:

執行定時任務需要啟動beat和worker

  • beat:定時提交任務的進程---》配置在app.conf.beat_schedule的任務
  • worker:執行任務
- 第一步:在celery的py文件中寫入
    # 導入定時需要的模塊
    from celery.schedules import crontab
    # 第一步:在celery的py文件中寫入
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    # celery的配置文件#####
    # 任務的定時配置
    app.conf.beat_schedule = {
        'send_sms': {  # 配置執行函數的名字
            'task': 'celery_task.task.send_sms',  # 導入任務的位置
            # 'schedule': timedelta(seconds=3),  # 時間對象
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八點
            'schedule': crontab(hour=9, minute=43),  # 每天9點43
            'args': ('18888888', '6666'),  # 配置執行函數需要的參數
        },
    }

    
- 第二步:啟動beat  # 啟動後配配置的任務會自動提交
	celery -A celery_task beat -l info
    
- 第三步:啟動worker  # beat提交的任務被被執行
	celery -A celery_task worker -l info -P eventlet
    

三、Django中使用celery

補充:

如果在公司中,只做定時任務有一個框架更簡單一點

使用步驟:

-1 把咱們寫的包,複製到項目目錄下
    -luffy_api
        -celery_task #celery的包路徑
        -luffy_api  #源代碼路徑

-2 在使用提交非同步任務的位置,導入使用即可
    -視圖函數中使用,導入任務
    -任務.delay()  # 提交任務


-3 啟動worker,如果有定時任務,啟動beat

-4 等待任務被worker執行

-5 在視圖函數中,查詢任務執行的結果

1、模擬寫一個非同步秒殺任務

後端

view.py

from celery.result import AsyncResult
from celery_task.celery import app
from celery_task.task import sckill_task


# 秒殺介面
class SeckillView(ViewSet):

    # 開啟秒殺
    @action(methods=['GET'], detail=False)
    def seckill(self, request):
        # 獲取商品鏈接
        goods_id = request.query_params.get('goods_id')
        # 將任務提交給worker
        res = sckill_task.delay(goods_id)
        # 將任務的ID反饋給前端
        return APIResponse(task_id=str(res))

    # 查詢秒殺結果
    @action(methods=['GET'], detail=False)
    def get_result(self, request):
        # 前端將任務ID產過來,用於接收結果
        task_id = request.query_params.get('task_id')
        # 調用介面,查詢結果
        a = AsyncResult(id=task_id, app=app)
        if a.successful():
            result = a.get()
            if result:
                return APIResponse(msg='秒殺成功')
            else:
                return APIResponse(code=101, msg='手速滿了,秒殺失敗')
        elif a.status == 'PENDING':
            return APIResponse(code=666, msg='加速秒殺中')
        return APIResponse(msg='錯誤')

celery.py ---->秒殺任務

import random


# 秒殺函數
@app.task()
def sckill_task(goods_id):
    print('商品正在秒殺中')
    time.sleep(random.choice([6, 7, 8, 9]))
    print('商品秒殺結束')
    return random.choice([True, False])

前端:

<template>
  <div>
    <button @click="clickHandle">點擊秒殺</button>
  </div>
</template>

<script>
export default {
  name: "Template",
  data() {
    return {
      // 用於接收任務ID
      task_id: '',
      // 用戶存放定時任務
      t: ''
    }
  },
  methods: {
    // 用戶點擊秒殺後發送請求
    clickHandle() {
      // 向厚點提交秒殺任務
      this.$axios.get(this.$settings.BASE_URL + '/user/seckill/seckill/?goods_id=1').then(res => {
        // 判斷任務是否提交成功
        if (res.data.code == 100) {
          // 提交成功會獲取到任務ID
          this.task_id = res.data.task_id
          // 告知用戶商品正在秒殺中
          this.$message('正在秒殺中')
          // 啟動一個定時任務,每隔3秒向後端發送請求,獲取任務是否提交成功
          this.t = setInterval(res => {
            // 定時向後端發送請求,判斷秒殺結果
            this.$axios.get(this.$settings.BASE_URL + '/user/seckill/get_result/?task_id=' + this.task_id).then(res => {
              // 判斷任務是否結束
              if (res.data.code == 666) {
                this.$message(res.data.msg)
                // 任務結束反饋結果,關閉定時器
              } else {
                this.$message(res.data.msg)
                // 關閉定時器
                clearInterval(this.t)
                this.t = ''
              }
            })
          }, 3000)
        }
      })
    }
  }
}
</script>

2、總結

  • 第一步:將celery包複製到項目路徑下
-luffy_api
    -celery_task #celery的包路徑
    celery.py  # 一定不要忘了一句話
        import os
         # 重點:celery中使用djagno,任務中可能會使用django的orm,緩存,表模型。。。。一定要加
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
        -luffy_api  #源代碼路徑
  • 第二步:在需要使用非同步的地方導入celery實例即可使用
-視圖函數中使用,導入任務
-任務.delay()  # 提交任務
  • 第三步:啟動worker,如果有定時任務,啟動beat

  • 第四步: 等待任務被worker執行

  • 第五步:在視圖函數中,查詢任務結果


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • find,grep,sed,awk find:常用在目錄下精確查找文件(最擅長找文件) grep:常用來做全局數據的查詢定位(最擅長文本過濾) sed:常用來做行數據增刪改查(最擅長取行) awk:常用來做列數據切分與提取(最擅長取列) 1.find【擅長在目錄下找文件】 find 命令用來在指定目 ...
  • 先看如下一個DEMO示例代碼:(其中doBatchGet被子類重寫了1次) public abstract class BaseDemoService<T> { public String batchGet(T... ints) { T one=ints[0]; System.out.println ...
  • 前置知識: Web 伺服器:可以指硬體上的,也可以指軟體上的。從硬體的角度來說, Web 伺服器指的就是一臺存儲了網路服務軟體的電腦;從軟體的角度來說, Web 伺服器指的是一種軟體,比如 Tomcat。 Servlet 容器:目前主流的 Servlet 容器軟體包括 Tomcat、Jetty、J... ...
  • SpringBoot Controller 控制器 SpringBoot提供了@Controller和@RestController兩種註解來標識此類負責接收和處理HTTP請求。 如果請求的是頁面和數據,使用@Controller註解即可;如果只是請求數據,則可以使用@RestController註 ...
  • 1.學習目標 2.簡介 技術論壇:http://bbs.chinaunix.net/forum-240-1.html 資源地址:https://sourceforge.net/projects/fastdfs/ 源碼地址:https://github.com/happyfish100 FastDFS ...
  • 前言 對於大多數 maven 多模塊化工程,可以使用 Jacoco 這款工具,關於 Jacoco 這款工具,ChatGPT 對它的描述是這樣的: JaCoCo(Java Code Coverage)是一個開源的測試覆蓋率工具,它可以用於幫助開發人員衡量其軟體測試的有效性。它支持多種語言,包括 Jav ...
  • 1、Spring 1.1、簡介 Spring:春天 >給軟體行業帶來了春天! 2002,首次推出了Spring框架的雛形:interf21框架! Spring框架即以interface21框架為基礎,經過重新設計,並不斷豐富其內涵,於2004年3月24日發佈了1.0正式版。 Rod Johnson ...
  • 之前給大家寫過如何將 ChatGPT 接入微信和釘釘,沒看過的可以往公眾號前面的文章翻翻,最近又發現了一個有趣的玩法,周末找時間實現了一下,感覺挺不錯的,分享給大家。 背景 事情的起因是阿粉在朋友圈看到了這樣一條信息,敏感信息已經去掉了,意思很明顯就是將 OpenAI 接入到知識星球了,用戶可以通過 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...