Celery框架

来源:https://www.cnblogs.com/xiongmozhou/archive/2018/07/18/9332864.html
-Advertisement-
Play Games

在學習Celery之前,我先簡單的去瞭解了一下什麼是生產者消費者模式。 生產者消費者模式 在實際的軟體開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為 ...


在學習Celery之前,我先簡單的去瞭解了一下什麼是生產者消費者模式。

生產者消費者模式

在實際的軟體開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。

單單抽象出生產者和消費者,還夠不上是生產者消費者模式。該模式還需要有一個緩衝區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩衝區,而消費者從緩衝區取出數據,如下圖所示:

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過消息隊列(緩衝區)來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給消息隊列,消費者不找生產者要數據,而是直接從消息隊列里取,消息隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。這個消息隊列就是用來給生產者和消費者解耦的。------------->這裡又有一個問題,什麼叫做解耦?

解耦:假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了。生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。緩衝區還有另一個好處。如果製造數據的速度時快時慢,緩衝區的好處就體現出來了。當數據製造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩衝區中。等生產者的製造速度慢下來,消費者再慢慢處理掉。

因為太抽象,看過網上的說明之後,通過我的理解,我舉了個例子:吃包子。

假如你非常喜歡吃包子(吃起來根本停不下來),今天,你媽媽(生產者)在蒸包子,廚房有張桌子(緩衝區),你媽媽將蒸熟的包子盛在盤子(消息)里,然後放到桌子上,你正在看巴西奧運會,看到蒸熟的包子放在廚房桌子上的盤子里,你就把盤子取走,一邊吃包子一邊看奧運。在這個過程中,你和你媽媽使用同一個桌子放置盤子和取走盤子,這裡桌子就是一個共用對象。生產者添加食物,消費者取走食物。桌子的好處是,你媽媽不用直接把盤子給你,只是負責把包子裝在盤子里放到桌子上,如果桌子滿了,就不再放了,等待。而且生產者還有其他事情要做,消費者吃包子比較慢,生產者不能一直等消費者吃完包子把盤子放回去再去生產,因為吃包子的人有很多,如果這期間你好朋友來了,和你一起吃包子,生產者不用關註是哪個消費者去桌子上拿盤子,而消費者只去關註桌子上有沒有放盤子,如果有,就端過來吃盤子中的包子,沒有的話就等待。對應關係如下圖:

 

考察了一下,原來當初設計這個模式,主要就是用來處理併發問題的,而Celery就是一個用python寫的並行分散式框架。

然後我接著去學習Celery

Celery的定義

Celery(芹菜)是一個簡單、靈活且可靠的,處理大量消息的分散式系統,並且提供維護這樣一個系統的必需工具。

    我比較喜歡的一點是:Celery支持使用任務隊列的方式在分佈的機器、進程、線程上執行任務調度。然後我接著去理解什麼是任務隊列。

任務隊列

任務隊列是一種線上程或機器間分發任務的機制。

消息隊列

消息隊列的輸入是工作的一個單元,稱為任務,獨立的職程(Worker)進程持續監視隊列中是否有需要處理的新任務。

Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,之後中間人把消息派送給職程,職程對消息進行處理。如下圖所示:

 

Celery 系統可包含多個職程和中間人,以此獲得高可用性和橫向擴展能力。

Celery的架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這裡我先去瞭解RabbitMQ,Redis

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分散式的系統節點中

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這裡我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。

然後我接著去安裝Celery,在安裝Celery之前,我已經在自己虛擬機上安裝好了Python,版本是2.7,是為了更好的支持Celery的3.0以上的版本。

 

因為涉及到消息中間件,所以我先去選擇一個在我工作中要用到的消息中間件(在Celery幫助文檔中稱呼為中間人<broker>),為了更好的去理解文檔中的例子,我安裝了兩個中間件,一個是RabbitMQ,一個redis。

在這裡我就先根據Celery3.1的幫助文檔安裝和設置RabbitMQ, 要使用 Celery,我們需要創建一個 RabbitMQ 用戶、一個虛擬主機,並且允許這個用戶訪問這個虛擬主機。下麵是我在個人虛擬機Ubuntu14.04上的設置:

$ sudo rabbitmqctl add_user forward password

#創建了一個RabbitMQ用戶,用戶名為forward,密碼是password

$ sudo rabbitmqctl add_vhost ubuntu

#創建了一個虛擬主機,主機名為ubuntu

$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"

#允許用戶forward訪問虛擬主機ubuntu,因為RabbitMQ通過主機名來與節點通信

$ sudo rabbitmq-server

之後我啟用RabbitMQ伺服器,結果如下,成功運行:

 

之後我安裝Redis,它的安裝比較簡單,如下:

$ sudo pip install redis

然後進行簡單的配置,只需要設置 Redis 資料庫的位置:

BROKER_URL = 'redis://localhost:6379/0'

URL的格式為:

redis://:password@hostname:port/db_number

URL Scheme 後的所有欄位都是可選的,並且預設為 localhost 的 6379 埠,使用資料庫 0。我的配置是:

redis://:password@ubuntu:6379/5

之後安裝Celery,我是用標準的Python工具pip安裝的,如下:

$ sudo pip install celery

為了測試Celery能否工作,我運行了一個最簡單的任務,編寫tasks.py,如下圖所示:

 

編輯保存退出後,我在當前目錄下運行如下命令:

$ celery -A tasks worker --loglevel=info

#查詢文檔,瞭解到該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py,後面的tasks就是APP的名稱,worker是一個執行任務角色,後面的loglevel=info記錄日誌類型預設是info,這個命令啟動了一個worker,用來執行程式中add這個加法任務(task)。

然後看到界面顯示結果如下:

 

我們可以看到Celery正常工作在名稱ubuntu的虛擬主機上,版本為3.1.23,在下麵的[config]中我們可以看到當前APP的名稱tasks,運輸工具transport就是我們在程式中設置的中間人redis://127.0.0.1:6379/5,result我們沒有設置,暫時顯示為disabled,然後我們也可以看到worker預設使用perfork來執行併發,當前併發數顯示為1,然後可以看到下麵的[queues]就是我們說的隊列,當前預設的隊列是celery,然後我們看到下麵的[tasks]中有一個任務tasks.add.

瞭解了這些之後,根據文檔我重新打開一個terminal,然後執行Python,進入Python交互界面,用delay()方法調用任務,執行如下操作:

 

這個任務已經由之前啟動的Worker非同步執行了,然後我打開之前啟動的worker的控制台,對輸出進行查看驗證,結果如下:

 

綠色部分第一行說明worker收到了一個任務:tasks.add,這裡我們和之前發送任務返回的AsyncResult對比我們發現,每個task都有一個唯一的ID,第二行說明瞭這個任務執行succeed,執行結果為12。

查看資料說調用任務後會返回一個AsyncResult實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(如果任務失敗,則為異常和回溯)。但這個功能預設是不開啟的,需要設置一個 Celery 的結果後端(backend),這塊我在下一個例子中進行了學習。

通過這個例子後我對Celery有了初步的瞭解,然後我在這個例子的基礎上去進一步的學習。

因為Celery是用Python編寫的,所以為了讓代碼結構化一些,就像一個應用,我使用python包,創建了一個celery服務,命名為pj。文件目錄如下:

 

celery.py

 

from __future __ import absolute_import

#定義未來文件的絕對進口,而且絕對進口必須在每個模塊的頂部啟用。

from celery import Celery

#從celery導入Celery的應用程式介面

App.config_from_object(‘pj.config’)

#從config.py中導入配置文件

if __name__ == ‘__main__’:

   app.start()

#執行當前文件,運行celery

app = Celery(‘pj’,

broker=‘redis://localhost’,

backend=‘redis://localhost’,

include=[‘pj.tasks’]

)

    #首先創建了一個celery實例app,實例化的過程中,制定了任務名pj(與當前文件的名字相同),Celery的第一個參數是當前模塊的名稱,在這個例子中就是pj,後面的參數可以在這裡直接指定,也可以寫在配置文件中,我們可以調用config_from_object()來讓Celery實例載入配置模塊,我的例子中的配置文件起名為config.py,配置文件如下:

   

    在配置文件中我們可以對任務的執行等進行管理,比如說我們可能有很多的任務,但是我希望有些優先順序比較高的任務先被執行,而不希望先進先出的等待。那麼需要引入一個隊列的問題. 也就是說在我的broker的消息存儲裡面有一些隊列,他們並行運行,但是worker只從對應 的隊列裡面取任務。在這裡我們希望tasks.py中的add先被執行。task中我設置了兩個任務:

所以我通過from celery import group引入group,用來創建並行執行的一組任務。然後這塊現需要理解的就是這個@app.task,@符號在python中用作函數修飾符,到這塊我又回頭去看python的裝飾器(在代碼運行期間動態增加功能的方式)到底是如何實現的,在這裡的作用就是通過task()裝飾器在可調用的對象(app)上創建一個任務。

    瞭解完裝飾器後,我回過頭去整理配置的問題,前面提到任務的優先順序問題,在這個例子中如果我們想讓add這個加法任務優先於subtract減法任務被執行,我們可以將兩個任務放到不同的隊列中,由我們決定先執行哪個任務,我們可以在配置文件中這樣配置:

 

先瞭解了幾個常用的參數的含義:

Exchange:交換機,決定了消息路由規則;

Queue:消息隊列;

Channel:進行消息讀寫的通道;

Bind:綁定了Queue和Exchange,意即為符合什麼樣路由規則的消息,將會放置入哪一個消息隊列

我將add這個函數任務放在了一個叫做for_add的隊列裡面,將subtract這個函數任務放在了一個叫做for_subtract的隊列裡面,然後我在當前應用目錄下執行命令:

 

這個worker就只負責處理for_add這個隊列的任務,執行這個任務:

 

任務已經被執行,我在worker控制台查看結果:

 

可以看到worker收到任務,並且執行了任務。

在這裡我們還是在交互模式下手動去執行,我們想要crontab的定時生成和執行,我們可以用celery的beat去周期的生成任務和執行任務,在這個例子中我希望每10秒鐘產生一個任務,然後去執行這個任務,我可以這樣配置:

 

使用了scheduler,要制定時區:CELERY_TIMEZONE = 'Asia/Shanghai',啟動celery加上-B的參數:

 

並且要在config.py中加入from datetime import timedelta。

更近一步,如果我希望在每周四的19點30分生成任務,分發任務,讓worker取走執行,可以這樣配置:

 

看完這些基礎的東西,我回過頭對celery在回顧了一下,用圖把它的框架大致畫出來,如下圖:

 

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • MIPS架構下的MCU,指令集包含R-Type、I-Type、J-Type三種,在數電課程設計時為了給MCU編寫指令集,需要將彙編語言轉化成機器代碼,這裡分享一下自己寫的Matlab 的 GUI。 主函數 C2M 函數rig_f 用來尋找名稱對應的寄存器地址 函數rig_n 用來將5位十進位數轉換成 ...
  • 原創 The Suspects Time Limit: 1000MS Memory Limit: 20000K Total Submissions: 48698 Accepted: 23286 Description Severe acute respiratory syndrome (SARS), ...
  • 1. 學習計劃 第二天:商品列表功能實現 1、服務中間件dubbo 2、工程改造為基於soa架構 3、商品列表查詢功能實現。 2. 將工程改造為SOA架構 2.1. 分析 由於宜立方商城是基於soa的架構,表現層和服務層是不同的工程。所以要實現商品列表查詢需要兩個系統之間進行通信。 如何實現遠程通信 ...
  • provider pom 連接註冊器register需要applicationContext需要web.xml載入配置文件監聽器 註冊 介面 實現類 註意這個@Service是dubbo的Service 右擊maven項目run as選maven build.. 輸入tomcat7:run 啟動這個 ...
  • w2 16、第二周-第02章節-Python3.5-模塊初識 sys模塊 sys.path sys.argv os模塊 os.system os.popen os.mkdir 17、第二周-第03章節-Python3.5-模塊初識2 18、第二周-第04章節-Python3.5-pyc是什麼 19、 ...
  • http://www.cnblogs.com/baixl/p/4170599.html ...
  • gets()函數 因為用gets函數輸入數組時,只知道數組開始處,不知道數組有多少個元素,輸入字元過長,會導致緩衝區溢出,多餘字元可能占用未使用的記憶體,也可能擦掉程式中的其他數據,後續用fgets函數代替。 fgets函數 一小段代碼舉例: (1) fgets函數一次讀入10 - 1個字元,如果少於 ...
  •   傳送文件描述符是高併發網路服務編程的一種常見實現方式。 "Nebula" 高性能通用網路框架即採用了UNIX域套接字傳遞文件描述符設計和實現。本文詳細說明一下傳送文件描述符的應用。 1. TCP伺服器程式設計範式   開發一個伺服器程式,有較多的的程式設計 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...