Celery 源碼解析三: Task 對象的實現

来源:http://www.cnblogs.com/makor/archive/2017/11/13/implement-of-Task.html
-Advertisement-
Play Games

Task 的實現在 Celery 中你會發現有兩處,一處位於 celery/app/task.py,這是第一個;第二個位於 celery/task/base.py 中,這是第二個。他們之間是有關係的,你可以認為第一個是對外暴露的介面,而第二個是具體的實現!所以,我們由簡入繁,先來看看對外的介面:其實... ...


Task 的實現在 Celery 中你會發現有兩處,一處位於 celery/app/task.py,這是第一個;第二個位於 celery/task/base.py 中,這是第二個。他們之間是有關係的,你可以認為第一個是對外暴露的介面,而第二個是具體的實現!所以,我們由簡入繁,先來看看對外的介面:

其實這就是個我們聲明 Task 的對象,例如我們使用這麼一段代碼:

我們可以看看 add 對象是啥:

In [1]: add
Out[1]: <@task: worker.add of tasks:0x10c9b06d0>

你會發現其實他就是我們的一個 Task 對象,所以你就可以觀察一下我們平時使用這個 add 的形式在裡面是如何實現的了,例如我們最常使用的可能就兩種方式了,分別是:

In [2]: add.delay()
In [3]: add.apply_async()

其他你看一下源碼就會發現他們的實現是一樣的,就像這樣:

我們現在很清楚,調用 apply_async 是將我們的 Task 提交到 MQ 中,然後獲得一個 celery.result.AsyncResult 對象,那麼具體都做了哪些工作,還是需要進一步查看的。apply_async 的參數有很多,所以我們需要先給他歸個類,這樣就好看多了,概括著看,可以分為這麼幾類:

  • AMQP 類:connection、queue、exchange、routing_key、producer、publisher、headers
  • MQ 策略類:countdown、eta、expires、retry、retry_policy、priority、
  • 管理類:shadow、serializer、compression、add_to_parent
  • 其他:args、kwargs、link、link_error、

這樣一看就感覺一目瞭然多了,AMQP 類的我們就不關註了,畢竟都看了這麼多了,應該大家都熟悉了。這裡的主要關註點還是在 MQ 策略類和管理類上,著重在 MQ 策略類上,因為管理類的功能稍微比較簡單一些。

async 發送消息

apply_async 中,我們可以看到有兩處執行邏輯,第一處是:

這裡是直接調用 apply,然後這裡的條件 task_always_eager 是什麼意思我們還沒見過,可以看一下文檔:

ok,瞭解,其實就是說這是個同步的介面,那麼我們就可以對應著看到下麵這處應該是非同步的實現咯:

既然如此,我們一個個得來看。

同步發送消息實現

同步執行消息的一層函數比較簡單,只是簡單的構建了一個 tracer,然後就從 tracer 調用中拿到調用結果,我們看上去會比較舒服:

但是,這個 tracer 的內容就複雜啦,但是這個 build_tracer 的構建函數不需要太過關註,所以我們需要關註的是 build_tracer 返回的這個 tracer 函數,但是這個函數的內容很多,為了簡約一下,所以給大家抽象了一番。同步調用過程中,可以分為幾部分功能,分別是:

  • 信號處理:執行前/後/成功這幾個時刻需要釋放一些信號給感興趣的成員
  • 失敗處理:對於沒有執行的情況需要進行細分處理,例如:reject/ignore/retry/exception 等
  • 依賴處理:因為 Celery 支持一些簡單的依賴,所以執行完成之後需要執行被依賴的 tasks
  • 執行邏輯:這個就是正常的函數調用咯
  • 其他:例如統計執行時間,出入棧之類的

@TODO:Q:這裡 task 的 callbacks 是什麼意思

我們就看下任務的執行邏輯是怎麼樣的,在代碼裡面是很簡單的一個函數調用,其實就是看 Task 對象有沒有實現 __call__ 方法,如果沒有就使用 run 方法:

那 task 的 __call__ 實現也不是太複雜,其實最後調用的也是 run 方法,所以到最後都還是 run 方法的責任啦,但是,這裡的基類是不實現 run 方法的,所以這個實現就落實到具體的實現類中了,那麼你以為 run 方法會在 celery/app/base.py 中實現?我之前也是這麼想的,但是,後來我發現不是的,這個實現其實就是我們在代碼裡面使用 @app.task 裝飾的函數,其實就是講我們自定義的函數封裝成 run,這樣調用 run 不就執行的我們的函數了嗎?有意思吧,這個封裝的方式我們後面再說,也就是說同步的方式我們就到此吧,也差不多了。

非同步發送消息實現

看完同步的我們再來看看非同步的,在說非同步的之前,我們先思考一下,非同步的應該是怎樣?之前看的時候我猜想非同步不就是把 Task 對象塞進 MQ 中麽,就應該是這麼簡單,但是,看完之後發現還是 too young 了,因為從同步中我們就可以看出,還是有很多功課要完成的,不管怎樣,一起來再看一遍。

從前邊我們說有同步和非同步兩種形式那裡我們可以發現同步和非同步的除了功能不一樣之外,還有調用的對象也不一樣,同步的是調用 Task 自己的方法,也就是說消息被 Task 自己消化了;而非同步的確實使用的 Celery 對象的方法,也就是說還得依賴於 Celery 這個 Boss 來實現。這是為啥呢?很明顯嘛,Task 自身沒有關於 MQ 的任何消息,而只有一個綁定的 Celery 對象,所以從抽象層面就只能交給 Celery 了,而 Celery 卻包含了所有你需要的信息,是可以完成這個任務的。

所以,非同步的消息到了 Celery 是這麼被髮出去的:

這裡出現了一個我們還沒怎麼接觸過的 amqp,但是沒關係,隨著等下的瞭解,我們會認識到它的,這裡的幾個關鍵步驟都是通過 amqp 來完成的,所以我們應該著重看看他們

非同步消息體的創建

在 Celery 中,非同步消息體是通過 create_task_message 來創建的,我們可以發現,這裡是傳了一大堆參數進去,但是,無妨,對於這些參數,我們大部分都在前面見過了,不怵,主要還是需要關註一下內部都為消息體做了什麼工作:

這裡可以發現兩件事情

  1. 消息體的預處理都是在這裡完成的,例如檢驗和轉換參數格式
  2. 構建消息就用了四個屬性:headerspropertiesbodysent_event

這裡其實就是所有的構建消息體的代碼了,為什麼呢,因為 task_message 是一個 nametuple:

非同步消息的發送

非同步消息的發送這裡不是直接就調用的一個函數,而是動態得創建了一個 sender ,然後才調用這個 sender 發送的(沒搞懂為啥,為了擴展?)。而創建 sender 的邏輯倒是比較簡單,所以忽略了,直接來看真正的 send 操作是如何完成的,其實之前提過了,這裡真正的 send 操作就像之前我們看同步的執行邏輯一樣尿性,又臭又長,真的,又臭又長,而且作者自己都加註釋承認了,他的理由是為了性能!

同樣得,為了方便我們的理解,我還是採用抽絲剝繭的方式來給大家介紹一下,首先,我習慣性得分個類:

  • MQ 的各項功能:routing_key/exchange/delivery_mode/retry
  • 任務執行的前後處理:發送前/發送後
  • 真正的發送邏輯
  • 其他

其實重頭戲應該在 MQ 的參數確定上,因為只要這些參數都確定了,消息的發送只是一個 producer.publish 就解決的事情,所以我們花些精力來看看 MQ 的參數都是怎麼決定出來的:

  • queue_name
    1. 調用 task.delay 的時候傳的,沒傳並且也沒傳 exchange 那就是 default
    2. 不會出現傳了 exchange 但是不傳 queue
  • routing_key
    1. 調用 task.delay 的時候傳的,沒傳就看 exchange 有沒有,沒有就是 queue 的值了
    2. 如果參數傳了 exchange,那麼就是配置中的預設 routing_key
  • exchange:
    1. 調用 task.delay 的時候傳的,沒傳但是 exchange_type 類型是 direct,那麼就是 ""
    2. 如果類型不是 direct,那麼 queue 有 exchange 就用,沒有就使用預設的
  • delivery_mode
    1. 調用 task.delay 的時候傳的,沒傳就看 queue 裡面有沒有,有就用
    2. 沒有就使用預設的
  • retry:
    1. 調用 task.delay 的時候傳了就用,沒傳就用預設的

等這些參數確認完之後,就使用這些參數發送了!

然後這樣子就將消息發出去了,等待 Worker 的接收,而 worker 的接受邏輯我們之前已經看到了,其實還是註冊的 Consumer 的 on_message

附加

在前面我們說如何構建非同步消息體的時候,對於消息體只是簡單的用幾個 ... 忽略過,但是,對於整體理解來說,我們不應該忽略他們的實質內容,所以在最後我把他們都羅列出來,前後的會用到的。而且你會發現有點意思的是,對於我們的一個非同步調用,task 名和 id 都是放在 headers 裡頭的,而參數什麼的卻是放在 body 裡面,在我自己實現的非同步 MQ 裡面,這些都是放在 body 裡面的,這點我倒是不太欣賞 Celery 的。

headers

properties

body

send_event


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 異常的分類 在使用上 Error不用管他虛擬機錯誤 Exception必須要用catch抓 RuntimeExcption可以處理也可以不用處理 說明 Error:稱為錯誤,由java虛擬機生成並拋出,包括動態連接失敗,虛擬機錯誤,程式對其不做處理。 Exception:所有異常的父類,其子類對應了 ...
  • 初入Python,一開始就被她簡介的語法所吸引,代碼簡潔優雅,之前在C#裡面打開文件寫入文件等操作相比Python複雜多了,而Python打開、修改和保存文件顯得簡單得多。 1、打開文件的例子: 2、利用urllib庫請求頁面進行簡單的翻譯,請求百度翻譯,將要翻譯的內容當做參數傳給百度,然後將結果賦 ...
  • 1、@RequestMapping RequestMapping是一個用來處理請求地址映射的註解,可用於類或方法上。用於類上,表示類中的所有響應請求的方法 都是以該地址作為父路徑,類和方法共同組成的字元串才是一個完整的url. RequestMapping註解有六個屬性,下麵我們把她分成三類進行說明 ...
  • Alt+Enter 自動添加包Ctrl+t SVN更新Ctrl+k SVN提交Ctrl + / 註釋(取消註釋)選擇的行Ctrl+Shift+F 高級查找Ctrl+Enter 補全Shift + Enter 開始新行TAB Shift+TAB 縮進/取消縮進所選擇的行Ctrl + Alt + I 自 ...
  • 程式需求: 流程圖: 好像畫的不咋地 查看代碼: #!/usr/bin/env python # _*_ coding:utf-8 _*_ # File_type:一個登錄介面 # Author:smelond import os username = "smelond"#用戶名 password ...
  • java內部類是從JDK1.1開始出現的,因此,很多人都不陌生,但是又會覺得不熟悉。原因是平時編寫代碼時可能用到的場景不多,用得最多的是在有事件監聽的情況下。所以,這裡將從四個方面做一個簡單的總結: 一.內部類基礎 在Java中,可以將一個類定義在另一個類裡面或者一個方法裡面,這樣的類稱為內部類。廣 ...
  • 之前對python的標準輸入輸出關註不多,所以在碰到flush的時候有了諸多疑惑。 本文記錄了和flush方法相關的緩衝的相關內容,供學習參考。 數據緩衝 在電腦科學中,數據緩衝(data buffer)是一個當數據需要從一個地方移到另一個地方時,用來暫時保存數據的物理區域。常見的情況是,當輸入數 ...
  • java異常的概念 執行期的錯誤(javac xxx.java) 運行期的錯誤(java xxx) 這裡講的是運行期出現的錯誤 上面這段代碼的輸出結果是3 為啥12沒列印出來呢?System.out.println(arr[4])這段帶代碼想獲取數組中的第五個元素但是沒有所以報錯了直接執行了catc ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...