celery筆記六之worker介紹

来源:https://www.cnblogs.com/hunterxiong/archive/2023/06/23/17500482.html
-Advertisement-
Play Games

> 本文首發於公眾號:Hunter後端 > 原文鏈接:[celery筆記六之worker介紹](https://mp.weixin.qq.com/s/Ck_7cEz6dldN12OmYzFg9Q) 前面我們介紹過 celery 的理想的設計方式是幾個 worker 處理特定的任務隊列的數據,這樣可以 ...


本文首發於公眾號:Hunter後端
原文鏈接:celery筆記六之worker介紹

前面我們介紹過 celery 的理想的設計方式是幾個 worker 處理特定的任務隊列的數據,這樣可以避免任務在隊列中的積壓。

這一篇筆記我們介紹一下如何使用 worker 提高系統中任務的處理效率。

  1. worker啟動
  2. worker與隊列
  3. worker檢測
  4. 其他worker命令

1、worker 啟動

前面介紹過 worker 的啟動方式,在 celery 配置文件的上一級目錄運行下麵的命令:

celery -A hunter worker -l INFO

其中,-l 表示日誌等級,相當於是 --loglevel=INFO

celery -A hunter worker --loglevel=INFO

指定worker的hostname

celery -A hunter worker -l INFO -n worker1@%h

其中,%h 表示主機名,包含功能變數名稱在內,%n 表示僅包含主機名,%d 表示僅包含功能變數名稱。

以下是示例:

變數 示例 結果
%h worker1@%h [email protected]
%n worker1@%n worker1@george
%d worker1@%d [email protected]

指定日誌文件地址

logfile 參數可以指定日誌文件地址:

celery -A hunter worker --loglevel=INFO --logfile=/Users/hunter/python/celery_log/celery.log

殺死 worker 進程

我們可以通過獲取 worker 的進程 id 來殺死這些進程:

ps aux | grep 'celery -A hunter' | awk '{print $2}' |xargs sudo kill -9

併發處理

一般來說,當我們直接啟動 worker 的時候,會預設同時起好幾個 worker 進程。

如果不指定 worker 的數量,worker 的進程會預設是所在機器的 CPU 的數量。

我們也可以通過 concurrency 參數來指定啟動 worker 的進程數。

比如說,我們想啟動三個 worker 的進程,可以如下指定:

celery -A hunter worker --concurrency=3 -l INFO

--concurrency 也可以簡寫成 -c:

celery -A hunter worker -c 3 -l INFO

這樣,我們在啟動的命令行里輸入下麵的參數就可以看到啟動了三個 worker 的進程:

ps aux |grep 'celery -A hunter'

這裡有一個關於 worker 進程數啟動多少的問題,是不是我們的 worker 啟動的越多,我們的定時任務和延時任務就會執行得越快呢?

並不是,有實驗證明 worker 的數量啟動得越多,對於 task 處理的性能有可能還會起到一個反向作用,這裡不作展開討論,我們可以設置 CPU 的數量即可。

當然,你也可以根據 worker 處理任務的情況,基於 application,基於工作負載,任務運行時間等試驗出一個最佳的數量。

2、worker與隊列

消費指定隊列的task

我們可以在運行 worker 的時候指定 worker 只消費特定隊列的 task,這個特定隊列,可以是一個,也可以是多個,用逗號分隔開。

指定的方式如下:

celery -A hunter worker -l INFO -Q queue_1,queue_2

列出所有活躍的queues

下麵的命令可以列出所有系統活躍的隊列信息:

celery -A hunter inspect active_queues

假設目前我們相關配置如下:

app.conf.task_queues = (
    Queue('default_queue',),
    Queue('queue_1'),
    Queue('queue_2'),
)

app.conf.task_routes = {
    'blog.tasks.add': {
        'queue': 'queue_1',
    },
    'blog.tasks.minus': {
        'queue': 'queue_2',
    },
}

我們這樣啟動worker:

celery -A hunter worker -l INFO -c 3 -n worker1@%h

然後運行上面的查看隊列命令:

celery -A hunter inspect active_queues

可以看到如下輸出:

->  worker1@localhost: OK
    * {'name': 'default_queue', 'exchange': {...}, 'routing_key': 'default_queue', ...}
    * {'name': 'queue_1', 'exchange': {...}, 'routing_key': 'default_queue', ...}
    * {'name': 'queue_2', 'exchange': {...}, 'routing_key': 'default_queue', ...}

1 node online.

其中,輸出結果最上面的 worker1@localhost 就是我們啟動 worker 通過 -n 指定的 hostnam,可以通過這個來指定 worker。

我們可以指定 worker 輸出對應的隊列數據:

celery -A hunter inspect active_queues -d worker1@localhost

除了命令行,我們也可以在交互界面來獲取這些數據:

# 獲取所有的隊列信息
from hunter.celery import app
app.control.inspect().active_queues()

# 獲取指定 worker 的隊列信息
app.control.inspect(['worker1@localhost']).active_queues()

3、worker 的檢測

app.control.inspect() 函數可以檢測正在運行的 worker 信息,我們可以用下麵的命令來操作:

from hunter.celery import app

i = app.control.inspect()

這個操作是獲取所有節點,我們也可以指定單個或者多個節點檢測:

# 輸入數組參數,表示獲取多個節點worker信息
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])

# 輸入單個worker名,指定獲取worker信息
i = app.control.inspect('worker1@localhost')

獲取已經註冊的task列表

用到前面的 app.control.inspect() 函數和其下的 registered() 函數

i.registered()

# 輸出結果為 worker 及其下的 task name 
# 輸出示例為 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}

輸出的格式是一個 dict,worker 的名稱為 key,task 列表為 value

正在執行的 task

active() 用於獲取正在執行的 task 函數

i.active()

# 輸出 worker 正在執行的 task
# 輸出示例為 {'worker1@localhost': [{'id': 'xxx', 'name': 'blog.tasks.add', 'args': [3, 4], 'hostname': 'worker1@localhost', 'time_start': 1659450162.58197, ..., 'worker_pid': 41167}

輸出的結果也是一個 dict,每個 worker 下有 n 個正在 worker 中執行的 task 信息,這個 n 的最大數量取決於前面我們啟動 worker 時的 --concurrency 參數。

在其中的 task 信息里包含 task_id,task_name,和輸入的參數,開始時間,worker name 等。

即將運行的 task

比如我們運行 add 延時任務,定時在 20s 之後運行:

add.apply_async((1, 1), countdown=20)

返回的結果每個 worker 下有一個任務列表,每個列表存有任務的信息:

i.scheduled()

# 輸出信息如下
# {'worker1@localhost': [{'eta': '2022-08-02T22:56:49.503517+08:00', 'priority': 6, 'request': {'id': '23080c03-a906-4cc1-9ab1-f27890c58adb', 'name': 'blog.tasks.add', 'args': [1, 1], 'kwargs': {}, 'type': 'blog.tasks.add', 'hostname': 'worker1@localhost', 'time_start': None, 'acknowledged': False, 'delivery_info': {...}}]}

queue隊列中等待的 task

如果我們有任務在 queue 中積壓,我們可以使用:

i.reserved()

來獲取隊列中等待的 task 列表

4、其他 worker 命令

ping-pong

檢測 worker 還活著的 worker

使用 ping() 函數,可以得到 pong 字元串的回覆表明該 worker 是存活的。

from hunter.celery import app

app.control.ping(timeout=0.5)

# [{'worker1@localhost': {'ok': 'pong'}}]

我們也可以指定 worker 來操作:

app.control.ping(['worker1@localhost'])

如果你瞭解 redis 的存活檢測操作的話,應該知道在 redis-cli 里也可以執行這個 ping-pong 的一來一回的檢測操作。

如果想獲取更多後端相關文章,可掃碼關註閱讀:
image


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 產品設計出來之後啊,大家使用的時候覺得反過來使用更加便捷。但是屏幕顯示是反的。那怎麼辦那????? 修改硬體費時費工,那能否軟體實現那????? 如果純軟體使用那就太費系統資源了。於是就想到了使用全志R528 自帶的G2D功能(硬體加速功能)。 使用它進行旋轉,後又發現uboot階段系統沒有G2D導 ...
  • 1. 背景 日誌領域是Elasticsearch(ES)最重要也是規模最大的應用場景之一。這得益於 ES 有高性能倒排索引、靈活的 schema、易用的分散式架構,支持高吞吐寫入、高性能查詢,同時有強大的數據治理生態、端到端的完整解決方案。但原生 ES 在高吞吐寫入、低成本存儲、高性能查詢等方面還有 ...
  • 上一篇文章 [我在 vscode 插件里接入了 ChatGPT,解決了代碼變數命名的難題](https://www.cnblogs.com/jaycewu/p/17476198.html) 中,展示瞭如何在 vscode 插件中使用 ChatGPT 解決代碼變數命名的問題。vscode 插件市場中有 ...
  • 博客推行版本更新,成果積累制度,已經寫過的博客還會再次更新,不斷地琢磨,高質量高數量都是要追求的,工匠精神是學習必不可少的精神。因此,大家有何建議歡迎在評論區踴躍發言,你們的支持是我最大的動力,你們敢投,我就敢肝 ...
  • 今天我們來談一談系統複雜度的根源之【高性能】 對性能的不懈追求一直是人類科技持續發展的核心動力。例如電腦,從電子管電腦到晶體管電腦,再到集成電路電腦,運算性能從每秒幾次提高到每秒幾億次。然而,隨著性能的提升,相應的方法和系統複雜度也逐漸增加。現代電腦CPU集成了數億顆晶體管,其邏輯複雜度和 ...
  • 博客推行版本更新,成果積累制度,已經寫過的博客還會再次更新,不斷地琢磨,高質量高數量都是要追求的,工匠精神是學習必不可少的精神。因此,大家有何建議歡迎在評論區踴躍發言,你們的支持是我最大的動力,你們敢投,我就敢肝 ...
  • kkfileview是一個開源的文件文檔線上預覽項目解決方案。該項目使用流行的spring boot搭建,易上手和部署以及二次開發,並提供Docker鏡像發行包,方便在容器環境部署。基本支持主流辦公文檔的線上預覽。支持word excel ppt,pdf等辦公文檔支持txt,java,php,py,... ...
  • 某日二師兄參加XXX科技公司的C++工程師開發崗位第23面: > 面試官:`vector`瞭解嗎? > > 二師兄:嗯,用過。 > > 面試官:那你知道`vector`底層是如何實現的嗎? > > 二師兄:`vector`底層使用動態數組來存儲元素對象,同時使用`size`和`capacity`記錄 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...