Task 的實現在 Celery 中你會發現有兩處,一處位於 celery/app/task.py,這是第一個;第二個位於 celery/task/base.py 中,這是第二個。他們之間是有關係的,你可以認為第一個是對外暴露的介面,而第二個是具體的實現!所以,我們由簡入繁,先來看看對外的介面:其實... ...
Task 的實現在 Celery 中你會發現有兩處,一處位於 celery/app/task.py,這是第一個;第二個位於 celery/task/base.py 中,這是第二個。他們之間是有關係的,你可以認為第一個是對外暴露的介面,而第二個是具體的實現!所以,我們由簡入繁,先來看看對外的介面:
其實這就是個我們聲明 Task 的對象,例如我們使用這麼一段代碼:

我們可以看看 add 對象是啥:
In [1]: add
Out[1]: <@task: worker.add of tasks:0x10c9b06d0>
你會發現其實他就是我們的一個 Task 對象,所以你就可以觀察一下我們平時使用這個 add 的形式在裡面是如何實現的了,例如我們最常使用的可能就兩種方式了,分別是:
In [2]: add.delay()
In [3]: add.apply_async()
其他你看一下源碼就會發現他們的實現是一樣的,就像這樣:

我們現在很清楚,調用 apply_async
是將我們的 Task 提交到 MQ 中,然後獲得一個 celery.result.AsyncResult 對象,那麼具體都做了哪些工作,還是需要進一步查看的。apply_async
的參數有很多,所以我們需要先給他歸個類,這樣就好看多了,概括著看,可以分為這麼幾類:
- AMQP 類:connection、queue、exchange、routing_key、producer、publisher、headers
- MQ 策略類:countdown、eta、expires、retry、retry_policy、priority、
- 管理類:shadow、serializer、compression、add_to_parent
- 其他:args、kwargs、link、link_error、
這樣一看就感覺一目瞭然多了,AMQP 類的我們就不關註了,畢竟都看了這麼多了,應該大家都熟悉了。這裡的主要關註點還是在 MQ 策略類和管理類上,著重在 MQ 策略類上,因為管理類的功能稍微比較簡單一些。
async 發送消息
在 apply_async
中,我們可以看到有兩處執行邏輯,第一處是:

這裡是直接調用 apply
,然後這裡的條件 task_always_eager
是什麼意思我們還沒見過,可以看一下文檔:

ok,瞭解,其實就是說這是個同步的介面,那麼我們就可以對應著看到下麵這處應該是非同步的實現咯:

既然如此,我們一個個得來看。
同步發送消息實現
同步執行消息的一層函數比較簡單,只是簡單的構建了一個 tracer,然後就從 tracer 調用中拿到調用結果,我們看上去會比較舒服:

但是,這個 tracer 的內容就複雜啦,但是這個 build_tracer
的構建函數不需要太過關註,所以我們需要關註的是 build_tracer
返回的這個 tracer
函數,但是這個函數的內容很多,為了簡約一下,所以給大家抽象了一番。同步調用過程中,可以分為幾部分功能,分別是:
- 信號處理:執行前/後/成功這幾個時刻需要釋放一些信號給感興趣的成員
- 失敗處理:對於沒有執行的情況需要進行細分處理,例如:reject/ignore/retry/exception 等
- 依賴處理:因為 Celery 支持一些簡單的依賴,所以執行完成之後需要執行被依賴的 tasks
- 執行邏輯:這個就是正常的函數調用咯
- 其他:例如統計執行時間,出入棧之類的
@TODO:Q:這裡 task 的 callbacks 是什麼意思
我們就看下任務的執行邏輯是怎麼樣的,在代碼裡面是很簡單的一個函數調用,其實就是看 Task 對象有沒有實現 __call__
方法,如果沒有就使用 run
方法:

那 task 的 __call__
實現也不是太複雜,其實最後調用的也是 run
方法,所以到最後都還是 run
方法的責任啦,但是,這裡的基類是不實現 run
方法的,所以這個實現就落實到具體的實現類中了,那麼你以為 run
方法會在 celery/app/base.py 中實現?我之前也是這麼想的,但是,後來我發現不是的,這個實現其實就是我們在代碼裡面使用 @app.task
裝飾的函數,其實就是講我們自定義的函數封裝成 run
,這樣調用 run
不就執行的我們的函數了嗎?有意思吧,這個封裝的方式我們後面再說,也就是說同步的方式我們就到此吧,也差不多了。
非同步發送消息實現
看完同步的我們再來看看非同步的,在說非同步的之前,我們先思考一下,非同步的應該是怎樣?之前看的時候我猜想非同步不就是把 Task 對象塞進 MQ 中麽,就應該是這麼簡單,但是,看完之後發現還是 too young 了,因為從同步中我們就可以看出,還是有很多功課要完成的,不管怎樣,一起來再看一遍。
從前邊我們說有同步和非同步兩種形式那裡我們可以發現同步和非同步的除了功能不一樣之外,還有調用的對象也不一樣,同步的是調用 Task 自己的方法,也就是說消息被 Task 自己消化了;而非同步的確實使用的 Celery 對象的方法,也就是說還得依賴於 Celery 這個 Boss 來實現。這是為啥呢?很明顯嘛,Task 自身沒有關於 MQ 的任何消息,而只有一個綁定的 Celery 對象,所以從抽象層面就只能交給 Celery 了,而 Celery 卻包含了所有你需要的信息,是可以完成這個任務的。
所以,非同步的消息到了 Celery 是這麼被髮出去的:

這裡出現了一個我們還沒怎麼接觸過的 amqp
,但是沒關係,隨著等下的瞭解,我們會認識到它的,這裡的幾個關鍵步驟都是通過 amqp
來完成的,所以我們應該著重看看他們
非同步消息體的創建
在 Celery 中,非同步消息體是通過 create_task_message
來創建的,我們可以發現,這裡是傳了一大堆參數進去,但是,無妨,對於這些參數,我們大部分都在前面見過了,不怵,主要還是需要關註一下內部都為消息體做了什麼工作:

這裡可以發現兩件事情
- 消息體的預處理都是在這裡完成的,例如檢驗和轉換參數格式
- 構建消息就用了四個屬性:
headers
、properties
、body
和sent_event
這裡其實就是所有的構建消息體的代碼了,為什麼呢,因為 task_message 是一個 nametuple:

非同步消息的發送
非同步消息的發送這裡不是直接就調用的一個函數,而是動態得創建了一個 sender ,然後才調用這個 sender 發送的(沒搞懂為啥,為了擴展?)。而創建 sender 的邏輯倒是比較簡單,所以忽略了,直接來看真正的 send 操作是如何完成的,其實之前提過了,這裡真正的 send 操作就像之前我們看同步的執行邏輯一樣尿性,又臭又長,真的,又臭又長,而且作者自己都加註釋承認了,他的理由是為了性能!
同樣得,為了方便我們的理解,我還是採用抽絲剝繭的方式來給大家介紹一下,首先,我習慣性得分個類:
- MQ 的各項功能:routing_key/exchange/delivery_mode/retry
- 任務執行的前後處理:發送前/發送後
- 真正的發送邏輯
- 其他
其實重頭戲應該在 MQ 的參數確定上,因為只要這些參數都確定了,消息的發送只是一個 producer.publish
就解決的事情,所以我們花些精力來看看 MQ 的參數都是怎麼決定出來的:
- queue_name
- 調用
task.delay
的時候傳的,沒傳並且也沒傳exchange
那就是default
了 - 不會出現傳了
exchange
但是不傳queue
- 調用
- routing_key
- 調用
task.delay
的時候傳的,沒傳就看exchange
有沒有,沒有就是queue
的值了 - 如果參數傳了
exchange
,那麼就是配置中的預設routing_key
- 調用
- exchange:
- 調用
task.delay
的時候傳的,沒傳但是exchange_type
類型是direct
,那麼就是 "" - 如果類型不是
direct
,那麼 queue 有 exchange 就用,沒有就使用預設的
- 調用
- delivery_mode
- 調用
task.delay
的時候傳的,沒傳就看 queue 裡面有沒有,有就用 - 沒有就使用預設的
- 調用
- retry:
- 調用
task.delay
的時候傳了就用,沒傳就用預設的
- 調用
等這些參數確認完之後,就使用這些參數發送了!
然後這樣子就將消息發出去了,等待 Worker 的接收,而 worker 的接受邏輯我們之前已經看到了,其實還是註冊的 Consumer 的 on_message
附加
在前面我們說如何構建非同步消息體的時候,對於消息體只是簡單的用幾個 ...
忽略過,但是,對於整體理解來說,我們不應該忽略他們的實質內容,所以在最後我把他們都羅列出來,前後的會用到的。而且你會發現有點意思的是,對於我們的一個非同步調用,task
名和 id
都是放在 headers
裡頭的,而參數什麼的卻是放在 body
裡面,在我自己實現的非同步 MQ 裡面,這些都是放在 body
裡面的,這點我倒是不太欣賞 Celery 的。
headers

properties

body

send_event