python學習筆記3-celery分散式任務處理器

来源:http://www.cnblogs.com/suyuan1573/archive/2016/11/07/6037006.html
-Advertisement-
Play Games

celery是用python寫的一個非同步的任務框架,功能非常強大,具體的說明可以查看官網,這裡主要提供點demo讓你迅速使用該框架 1.環境安裝 預設安裝好了redis pip install celery pip install celery redis 用來作為任務消息的載體 2. tasks. ...


celery是用python寫的一個非同步的任務框架,功能非常強大,具體的說明可以查看官網,這裡主要提供點demo讓你迅速使用該框架   1.環境安裝 預設安裝好了redis pip install celery redis 用來作為任務消息的載體   2. tasks.py  
import sys
reload(sys)
sys.setdefaultencoding('utf-8’)  # 不加這句話,列印中文log會出錯
 
from celery import Celery
 
celery = Celery('tasks', broker='redis://127.0.0.1:6379/0') #選擇本地redis db=0 作為消息載體, 第一個參數為任務名稱
from celery.utils.log import get_task_logger # 倒入celery 中的log模塊
logger = get_task_logger(__name__)
 
@celery.task(bind=True, max_retries=10,
             default_retry_delay=1 * 6)  # bind 表示開啟, max_retries 是重新嘗試的次數,default_retry_delay 是預設的間隔時間,嘗試的時間
def exec_task_order_overtime(self, order_id):  # 訂單到期後,執行訂單失效的任務 
    try:
        logger.info('===================> exec_task_order_overtime order_id=%s' % order_id)
        success = BaseHandler.context_services.order_overtime_task_service.process_over_time(order_id)
        if success is False:
            logger.error(
                '<================order_overtime_task_service.process_over_time Failed, order_id=%s' % order_id)
            raise Return(False)
        else:
            logger.info(
                '<=================order_overtime_task_service.process_over_time Success, order_id=%s' % order_id)
    except Exception as exc:
        logger.info('exec_task_order_overtime retry, order_id=%s' % order_id)
        raise self.retry(exc=exc, countdown=3)  # 3 秒後繼續嘗試, 此處的countdown 優先順序高於裝飾器中的default_retry_delay
 

 

該文件路徑下執行命令, 此後celery 開始作為消費之執行任務 $celery worker -A tasks --loglevel=info   生產者呢?執行下麵語句後,redis中的db=0 的 celery key是一個 list 類型, 裡面存放著執行任務,如果celery沒有開啟可以清晰看到;開啟了celery可能已經被執行了
from celery import Celery
 
celery = Celery('tasks', broker='redis://127.0.0.1:6379/0') #消息載體
push_task_id = celery.send_task('tasks.exec_task_order_overtime'
                                , [order_id] # 參數,必須為list,具體可見源碼,第三個可以為dict,我們這裡沒有使用
                                , countdown=10)  #延時多久執行 推送消息 

疑問1:

  有的人會奇怪為什麼exec_task_order_overtime有self, 有時候發現沒有,區別在與裝飾器task,如果@celery.task, 不是裝飾器函數調用,則沒有self,The bind argument to the task decorator will give access to self (the task type instance)     疑問2:   celery 的構造函數的參數,第一個為模塊名,也就是本文件名稱, 第二個為redis載體地址,看官網介紹 The first argument to Celery is the name of the current module, this only needed so names can be automatically generated when the tasks are defined in the __main__module. The second argument is the broker keyword argument, specifying the URL of the message broker you want to use. Here using RabbitMQ (also the default option).   疑問3:   celery記憶體不足時,沒有反饋機制: 我們知道socket網路傳輸中,當接受端來不及處理的時候,發送端會阻塞;celery 完全沒有相關機制; 我們不需要發送端阻塞,當然也不能,但是我們的celery來不及處理時,應該緩存一些在redis中,雖然會造成消息處理不及時,但是也不至於記憶體不足的問題出現,這裡可能需要自己做一些處理; 例如:retry次數控制儘可能少,任務執行失敗後記錄下來,根據業務需要比如10分鐘後再次拋入redis消息隊列中        
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • http://www.jxedt.com/wen/jiaolian/3174965368951734360.html http://www.jxedt.com/wen/jiaolian/3174965371099414616.html http://www.jxedt.com/wen/jiaolia ...
  • http://www.jxedt.com/wen/quzheng/3174953227258822691.html http://www.jxedt.com/wen/quzheng/3174953224864464911.html http://www.jxedt.com/wen/quzheng/3 ...
  • http://www.jxedt.com/wen/quzheng/3174967347882688535.html http://www.jxedt.com/wen/quzheng/3174967343587459095.html http://www.jxedt.com/wen/quzheng/3 ...
  • http://www.jxedt.com/wen/yueche/3174970377368174680.html http://www.jxedt.com/wen/yueche/3174970378273685527.html http://www.jxedt.com/wen/yueche/3174 ...
  • http://www.jxedt.com/wen/luxian/3174956997276205144.htmlhttp://www.jxedt.com/wen/luxian/3174957016605851736.htmlhttp://www.jxedt.com/wen/luxian/317495 ...
  • http://www.jxedt.com/wen/luxian/3174971623024951384.html http://www.jxedt.com/wen/luxian/3174971623025016920.html http://www.jxedt.com/wen/luxian/3174 ...
  • Struts2的開發流程 為了能夠在eclipse中使用Struts2在進行開發時,需要根據需要導入一些有關的jar包; 在官網下載相關的壓縮包,這裡下載了兩個:struts-2.3.30-all.zip和struts-2.3.30-docs.zip,解壓到同一個文件夾中。 在eclipse中新建了 ...
  • python 關於redis的基本操作網上已經很多了,這裡主要介紹點個人覺得有意思的內容1.redis的事務操作以及watch 樂觀鎖;後面描述2.tornado下非同步使用redis的方式 redis是單進程單線程模型,本身應對外部請求的是單任務的,也是多線程安全的,這個大家都應該知道的, 所以才會 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...