celery筆記三之task和task的調用

来源:https://www.cnblogs.com/hunterxiong/archive/2023/06/13/17478782.html
-Advertisement-
Play Games

> 本文首發於公眾號:Hunter後端 > 原文鏈接:[celery筆記三之task和task的調用](https://mp.weixin.qq.com/s/AIobDZVDWV3r_XauvmkVKA) 這一篇筆記介紹 task 和 task 的調用。 以下是本篇筆記目錄: 1. 基礎的 task ...


本文首發於公眾號:Hunter後端
原文鏈接:celery筆記三之task和task的調用

這一篇筆記介紹 task 和 task 的調用。

以下是本篇筆記目錄:

  1. 基礎的 task 定義方式
  2. 日誌處理
  3. 任務重試
  4. 忽略任務運行結果
  5. task 的調用

1、基礎的 task 定義方式

前面兩篇筆記中介紹了最簡單的定義方式,使用 @app.task 作為裝飾器:

@app.task
def add(x, y):
    return x + y

如果是在 Django 系統中使用 celery,需要定義一個延時任務或者周期定時任務,可以使用 @shared_task 來修飾

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

在 Django 系統中使用 celery 的方式會在接下來的幾篇筆記中介紹道。

多個裝飾器

如果是 celery 的任務和其他裝飾器一起聯用,記得將 celery 的裝飾器放在最後使用,也就是列表的最前面:

@app.task
@decorator1
@decorator2
def add(x, y):
    return x + y

task名稱

每個 task 都有一個唯一的名稱用來標識這個 task,如果我們在定義的時候不指定,系統會為我們預設一個名稱,這些名稱會在 celery 的 worker 啟動的時候被系統掃描然後輸出一個列表展示。

還是上一篇筆記中我們定義的兩個 task,我們給其中一個指定 name:

#tasks1.py
from .celery import app


@app.task(name="tasks1.add")
def add(x, y):
    return x + y

可以觀察在 celery 的 worker 啟動的時候,會有一個輸出:

[tasks]
  . proj.tasks2.mul
  . tasks1.add

可以看到這個地方,系統就會使用我們定義的 name 了。

2、日誌處理

我們可以在啟動 worker 的時候指定日誌的輸出,定義格式如下:

celery -A proj worker -l INFO --logfile=/Users/hunter/python/celery_log/celery.log

在 task 中的定義可以使用 celery 中方法:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

也可以直接使用 logging 模塊:

import logging

logger1 = logging.getLogger(__name__)

直接在 task 中輸出:

@app.task(name="tasks1.add")
def add(x, y):
    logger.info("this is from logger")
    return x + y

然後在 worker 啟動時指定的日誌文件就會有我們列印出的日誌內容:

[2022-07-24 16:28:33,210: INFO/ForkPoolWorker-7] tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a]: this is from logger
[2022-07-24 16:28:33,224: INFO/ForkPoolWorker-7] Task tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a] succeeded in 0.016244667931459844s: 3

3、任務重試

對於一個 task,我們可以對其設置 retry 參數來指定其在任務執行失敗後會重試幾次,以及隔多長時間重試。

比如對於下麵的 div() 函數,我們來輸入除數為 0 的情況查看重試的功能。

當然,這裡我們是故意輸入參數錯誤,在實際的項目中可能會是其他的原因造成任務失敗,比如資料庫連接失敗等

任務重試的參數也都在 @app.task() 中定義:

# tasks1.py

@app.task(autoretry_for=(Exception, ),  default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
    return x / y

在這裡,autoretry_for 表示的是某種報錯情況下重試,我們定義的 Exception 表示任何錯誤都重試。

如果只是想在某種特定的 exception 情況下重試,將那種 exception 的值替換 Exception 即可。

default_retry_delay 表示重試間隔時長,預設值是 3 * 60s,即三分鐘,是以秒為單位,這裡我們設置的是 10s。

retry_kwargs 是一個 dict,其中有一個 max_retries 參數,表示的是最大重試次數,我們定為 5

然後可以嘗試調用這個延時任務:

from proj.tasks1 import div
div.delay(1, 0)

然後可以看到在日誌文件會有如下輸出:

[2022-07-24 16:59:35,653: INFO/ForkPoolWorker-7] Task proj.tasks1.div[1f65c410-1b2a-4127-9d83-a84b1ad9dd2c] retry: Retry in 10s: ZeroDivisionError('division by zero',)

且每隔 10s 執行一次,一共執行 5 次,5次之後還是不成功則會報錯。

retry_backoff 和 retry_backoff_max

還有一個 retry_backoff 和 retry_backoff_max 參數,這兩個參數是用於這種情況:如果你的 task 依賴另一個 service 服務,比如會調用其他系統的 API,然後這兩個參數可以用於避免請求過多的占用服務。

retry_backoff 參數可以設置成一個 布爾型數據,為 True 的話,自動重試的時間間隔會成倍的增長

第一次重試是 1 s後
第二次是 2s 後
第三次是 4s 後
第四次是 8s 後
...

如果 retry_backoff 參數是一個數字,比如是 3,那麼後續的間隔時間則是 3 的倍數增長

第一次重試 3s 後
第二次是 6s 後
第三次是 12s 後
第四次是 24s 後

retry_backoff_max 是重試的最大的間隔時間,比如重試次數設置的很大,retry_backoff 的間隔時間重覆達到了這個值之後就不再增大了。

這個值預設是 600s,也就是 10分鐘。

我們看一下下麵這個例子:

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

關於重試的機制,理論上應該是按照我們前面列出來的重試時間間隔進行重試,但是如果我們這樣直接運行 div.delay(),得出的間隔時間是不定的,是在 0 到 最大值之間得出的一個隨機值。

這樣產生的原因是因為還有一個 retry_jitter 參數,這個參數預設是 True,所以時間間隔會是一個隨機值。

如果需要任務延時的間隔值是按照 retry_backoff 和 retry_backoff_max 兩個設定值來運行,那麼則需要將 retry_jitter 值設為 False。

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

然後運行 div 的延時任務,就可以看到延時任務按照規律的間隔時間重試了,以下是日誌:

[2022-07-24 19:00:38,588: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 2s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:40,662: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:40,664: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 4s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:44,744: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:44,746: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 8s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:52,870: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:52,872: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 16s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:09,338: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:09,340: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 32s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:41,843: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:41,845: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:02:21,923: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:02:21,925: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:03:02,001: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:03:02,003: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)

因為我們設置的重試間隔時間最大為 40s,所以這個地方延時間隔時間到了 40 之後,就不再往上繼續增長了。

4、忽略任務運行結果

有時候延時任務的結果我們並不想保存,但是我們配置了 result_backend 參數,這個時候我們有三種方式不保存運行結果。

1.ignore_result=True 不保存任務運行的結果

@app.task(ignore_result=True)
def add(x, y):
    return x + y

2.app.conf 配置

也可以通過 app.conf 的配置來禁用結果的保存:

app.conf.update(
    task_ignore_result=True
)

3.執行單個任務的時候禁用

from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)

apply_async() 函數的作用相當於是帶參數的 delay(),或者 delay() 是簡化版的 apply_async(),這個我們下麵會介紹。

5、task 的調用

前面簡單兩個簡單的調用方法,一個是 apply_async(),一個是 delay()。

簡單來說就是 delay() 是不帶參數執行的 apply_async()。

以下用 add() 函數為例介紹一下他們的用法:

delay()

純粹的延時任務,只能如下操作:

add.delay(1, 2)

apply_async()

帶參數的用法,add() 函數的參數用 () 包起來:

add.apply_async((1, 2))

也可以帶其他參數,比如上面介紹的不保存運行結果:

add.apply_async((1, 2), ignore_result=True)

這個函數還可以指定延時的時間:

countdown參數

現在開始 10s 後開始運行:

add.apply_async((1, 2), countdown=10)

eta參數

也可以用 eta 參數來指定 10s 後運行:

from datetime import datetime, timedelta

now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))

expires參數

這個是用來設置過期的參數:

add.apply_async((1, 2), countdown=60, expires=120)

上面的參數表示,距現在60秒後開始執行,兩分鐘後過期

如果想獲取更多後端相關文章,可掃碼關註閱讀:
image


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 遠程線程註入是最常用的一種註入技術,在應用層註入是通過`CreateRemoteThread`這個函數實現的,通過該函數通過創建線程並調用 `LoadLibrary` 動態載入指定的DLL來實現註入,而在內核層同樣存在一個類似的內核函數`RtlCreateUserThread`,但需要註意的是此函數... ...
  • 生活中我們經常談及 “架構”,那麼到底什麼是 “架構”,Robert C.Martin《架構整潔之道》中的定義:軟體架構是指設計軟體的人為軟體賦予的形狀,這個形狀是指系統如何被劃分為組件 (Components),各個組件如何排列(Arrangement),組件之間如何溝通(Communicatio... ...
  • # 下載 https://www.wxwidgets.org/downloads/ 下載壓縮包即可 ![image](https://img2023.cnblogs.com/blog/916065/202306/916065-20230614040303993-2082032985.png) # 編 ...
  • # 問題背景 nexus3 的 admin 賬號密碼忘記了,需要重置。 # 環境說明 ``` nexus 基於 docker-compose 部署,版本 nexus3.26 docker 鏡像 sonatype/nexus3:3.26.1 ``` # 操作步驟 參考: https://support ...
  • ## 1、求解器 ### 1.1 複製源碼 本案例以icoFoam為例,複製【openFOAM/OpenFOAM-9/applications/solvers/incompressible/icoFoam】文件夾至run文件夾下(我的是【openFOAM/mtl-9/run/solvers/inco ...
  • 某日二師兄參加XXX科技公司的C++工程師開發崗位第13面: > 面試官:什麼是`RAII`? > > 二師兄:`RAII`是`Resource Acquisition Is Initialization`的縮寫。翻譯成中文是資源獲取即初始化。 > > 面試官:`RAII`有什麼特點和優勢? > > ...
  • 在springBoot的啟動類中,提供了一個mai函數的程式入口,來啟動載入SpringBoot程式,那麼註解@SpringBootApplication,通過源碼可以看到,它相當於@ComponentScan + @EnableAutoConfiguration + @SpringBootConf ...
  • C++是一門有著四十年曆史的語言,先後經歷過四次版本大升級(誕生、98、11、17(20),14算小升級)。每次升級都是很多問題和解決方案的取捨。瞭解這些歷史,能更好地幫助我們理清語言的發展脈絡。所以接下來我將借它的發展歷程,談一談我對它的理解,最後給出我認為比較合理的學習路線指南。 ### C++ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...