分散式任務隊列 1

来源:https://www.cnblogs.com/JcrLive/archive/2020/03/13/12489761.html
-Advertisement-
Play Games

Celery 是一個 基於python開發的分散式非同步消息任務隊列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子: 你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返 ...


Celery 是一個 基於python開發的分散式非同步消息任務隊列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子:

  1. 你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程式等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。 
  2. 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個簡訊祝福

Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis

 Celery有以下優點:

  1. 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單進程的celery每分鐘可處理上百萬個任務
  4. 靈活: 幾乎celery的各個組件都可以被擴展及自定製

Celery基本工作流程圖

 

 Celery安裝使用

Celery的預設broker是RabbitMQ, 僅需配置一行就可以broker_url = 'amqp://guest:guest@localhost:5672//'

rabbitMQ 沒裝的話請裝一下 https://www.cnblogs.com/JcrLive/p/12469792.html

redis  https://www.cnblogs.com/JcrLive/p/12464962.html

安裝redis組件   pip install -"celery[redis]"

配置

Configuration is easy, just configure the location of your Redis database:

app.conf.broker_url = 'redis://localhost:6379/0'

Where the URL is in the format of:

redis://:password@hostname:port/db_number

all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.

 

如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪

If you also want to store the state and return values of tasks in Redis, you should configure these settings:

app.conf.result_backend = 'redis://localhost:6379/0'

安裝celery模塊   pip install celery     task.py

from celery import Celery
 
app = Celery('tasks',
             broker='redis://localhost',
             backend='redis://localhost')
 
@app.task
def add(x,y):
    print("running...",x,y)
    return x+y

啟動Celery Worker來開始監聽並執行任務   進入文件目錄

 celery -A tasks worker --loglevel=info

 

調用任務

再打開一個終端, 進行命令行模式,調用任務  

>>> from tasks import add

>>> add.delay(44) 看你的worker終端會顯示收到 一個任務,此時你想看任務結果的話,需要在調用 任務時 賦值個變數 >>> result = add.delay(44)

The ready() method returns whether the task has finished processing or not:

>>> result.ready()
False

You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

>>> result.get(timeout=1)
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

>>> result.get(propagate=False)

If the task raised an exception you can also gain access to the original traceback:

>>> result.traceback


在項目中如何使用celery

目錄格式如下

proj/__init__.py

    /celery.py     /tasks.py   from __future__ import absolute_import, unicode_literals from celery import Celery   app = Celery('proj',              broker='localhost',              backend='localhost',              include=['proj.tasks'])   # Optional configuration, see the application user guide. app.conf.update(     result_expires=3600, )   if __name__ == '__main__':     app.start()      
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(*numbers):
    return sum(*numbers)
啟動worker   celery -A proj worker -loglevel info



後臺
celery multi start w1 -A proj -loglevel info
celery  multi restart w1 -A proj -loglevel info
celery multi stop w1
celery multi stopwait w1
 

Celery 定時任務

 celery支持定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模塊叫celery beat


from celery import Celery
from celery.schedules import crontab   app = Celery()   @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):     # Calls test('hello') every 10 seconds.     sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')       # Calls test('world') every 30 seconds     sender.add_periodic_task(30.0, test.s('world'), expires=10)       # Executes every Monday morning at 7:30 a.m.     sender.add_periodic_task(         crontab(hour='*', minute='*', day_of_week='friday'),         test.s('Happy Mondays!'),     )   @app.task def test(arg):     print(arg)
add_periodic_task 會添加一條定時任務

上面是通過調用函數添加定時任務,也可以像寫配置文件 一樣的形式添加, 下麵是每30s執行的任務
app.conf.beat_schedule = {
    'add-every-30-seconds': {         'task''tasks.add',         'schedule'30.0,         'args': (1616)     }, } app.conf.timezone = 'UTC'    
任務添加好了,需要讓celery單獨啟動一個進程來定時發起這些任務, 註意, 這裡是發起任務,不是執行,這個進程只會不斷的去檢查你的任務計劃, 每發現有任務需要執行了,就發起一個任務調用消息,交給celery worker去執行

celery -A periodic_task beat
啟動celery worker來執行任務 celery -A periodic_task worker

更複雜的定時配置  

上面的定時任務比較簡單,只是每多少s執行一個任務,但如果你想要每周一三五的早上8點給你發郵件怎麼辦呢?哈,其實也簡單,用crontab功能,跟linux自帶的crontab功能是一樣的,可以個性化定製任務執行時間

crontab       http://www.cnblogs.com/peida/archive/2013/01/08/2850483.html 

 

還有更多定時配置方式如下:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour='0,3,6,9,12,15,18,21')
Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='*',
hour='*',day_of_week='sun')
Same as previous.
crontab(minute='*/10',
hour='3,17,22',day_of_week='thu,fri')
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0,
day_of_month='2-30/3')
Execute on every even numbered day.
crontab(0, 0,
day_of_month='1-7,15-21')
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11',
month_of_year='5')
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year='*/3')
Execute on the first month of every quarter.

 

上面能滿足你絕大多數定時任務需求了,甚至還能根據潮起潮落來配置定時任務, 具體看 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#solar-schedules   



您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 我本身的專業是做電子的,因為這個行業的工資挺低的,就算熬個十年的時間,估計也很難工資過萬,而且基本是沒什麼發展的,都是在國企裡面,沒意思,如果沒有關係,根本別想翻身,但是因為還年輕不想一直這樣下去,所以就聽說了這個web前端,當時還不怎麼火,競爭也不大,但是聽說以後會發展的很好,所以就開始學了,當初 ...
  • 在JavaScript中,函數是經常用到的,在實際開發的時候,我想很多人都沒有太在意函數的聲明與函數表達式的區別,但是呢,這種細節的東西對於學好js是非常重要的。 函數聲明與函數表達式用代碼寫出來是這樣的: //函數聲明 function say(){ console.log("函數聲明") } / ...
  • Node.js Domain(域) 簡化非同步代碼的異常處理,可以捕捉處理try catch無法捕捉的異常。 Domain 模塊可分為隱式綁定和顯式綁定: 隱式綁定: 把在domain上下文中定義的變數,自動綁定到domain對象 顯式綁定: 把不是在domain上下文中定義的變數,以代碼的方式綁定到 ...
  • 程式名稱:功夫滑鼠KongFuMouse聯繫郵箱:[email protected]工程版本:Ver0.1.9版本狀態:工程版本,尚未發佈,敬請期待! 軟體介紹: 還在為一些固定化滑鼠點擊煩惱?還在為頻繁點擊滑鼠點擊煩惱?功夫滑鼠解放您的雙手!!!功夫滑鼠2020是一款功能強大的滑鼠自動執行軟體(滑鼠自動 ...
  • 從程式員往架構師轉型的路上,蔡學鏞老師總結的“四維架構設計方法論”對我頗有幫助,讓我對架構設計有了更加立體化、系統化的認知,現將學習心得分享出來給需要的小伙伴參考。這套方法論通過空間(X、Y、Z)三個維度及時間T維度將問題域解構成可以輕鬆應對的小方塊,分而治之。同時,空間(X、Y、Z)三個維度聯動,... ...
  • 一、動態HTML 1.爬蟲跟反爬蟲 2.動態HTML連載 (1)JavaScript (2)jQuery (3)Ajax (4)DHTML (5)Python採集動態數據 從JavaScript代碼入手採集​;Python第三方庫運行JavaScript,直接採集你在瀏覽器中看到的頁面 二、Sele ...
  • [TOC] getattr詳解 前言 這兩天在優化騰訊雲遷移平臺( "SmartMS" )的中間件( )時. 其中某些介面由於涉及多種伺服器系統類型, 遷移類型的判斷.導致往往一個介面動輒70 80行. 隨便進行一個介面的修改, 調試, 參數的變更. 都將花費好幾分鐘的時間去縷縷中間的邏輯.加上同一 ...
  • 靜態類型和動態類型、類型虛函數與多態、typeid、dynamic_cast、static_cast關鍵字的使用場合 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...