Spark Streaming任務延遲監控及告警

来源:https://www.cnblogs.com/xiaodf/archive/2019/11/01/11776915.html
-Advertisement-
Play Games

概述 StreamingListener 是針對spark streaming的各個階段的事件監聽機制。 StreamingListener介面 自定義StreamingListener 功能:監控批次處理時間,若超過閾值則告警,每次告警間隔2分鐘 應用 訂閱關註微信公眾號《大數據技術進階》,及時獲 ...


概述

StreamingListener 是針對spark streaming的各個階段的事件監聽機制。

StreamingListener介面

//需要監聽spark streaming中各個階段的事件只需實現這個特質中對應的事件函數即可
//本身既有註釋說明
trait StreamingListener {

  /** Called when the streaming has been started */
  /** streaming 啟動的事件 */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }

  /** Called when a receiver has been started */
  /** 接收啟動事件 */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  /** 每個批次提交的事件 */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  /** 每個批次啟動的事件 */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  /** 每個批次完成的事件  */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

自定義StreamingListener

功能:監控批次處理時間,若超過閾值則告警,每次告警間隔2分鐘

class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{

  private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")

//每個批次完成時執行
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    val processingStartTime = batchCompleted.batchInfo.processingStartTime
    val numRecords = batchCompleted.batchInfo.numRecords
    val processingEndTime = batchInfo.processingEndTime
    val processingDelay = batchInfo.processingDelay
    val totalDelay = batchInfo.totalDelay

    //將每次告警時間寫入redis,用以判斷告警間隔大於2分鐘
    val jedis = RedisClusterClient.getJedisClusterClient()
    val current_time = (System.currentTimeMillis / 1000).toInt
    val redis_time = jedis.get(appName)
    var flag = false
    if(redis_time==null || current_time-redis_time.toInt>120){
      jedis.set(appName,current_time.toString)
      flag = true
    }
    
    //若批次處理延遲大於批次時長指定倍數,並且告警間隔大約2分鐘,則告警
    if(totalDelay.get >= times * duration * 1000 && flag){
      val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"
      println(monitorContent)
      val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"
      val getURL = "http://node1:8002/message/weixin?msg="+msg
      HttpClient.doGet(getURL)
    }
  }
}

應用

//streamingListener不需要在配置中設置,可以直接添加到streamingContext中
object My{
    def main(args : Array[String]) : Unit = {
        val sparkConf = new SparkConf()
        val ssc = new StreamingContext(sparkConf,Seconds(20))
        ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))

        ....
    }
}

訂閱關註微信公眾號《大數據技術進階》,及時獲取更多大數據架構和應用相關技術文章!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 問題描述: 有時候伺服器操作導入數據.sql,或者 當需求不可以直接備份整庫還原時,往往通過導出資料庫腳本的方式來部署-還原資料庫表 但是當資料庫導出腳本很大,用Microsoft SQL Server Management Studio執行腳本時,往往會遇到“記憶體不足”的提示。 解決辦法: 用微軟 ...
  • 誤刪除用戶解決辦法 刪除用戶 刪除用戶 重啟mysql服務並登錄 恢復用戶 當登錄不上去首先停掉正在運行的資料庫 跳過授權表,跳過網路啟動資料庫 連接資料庫 刷新授權表 創建 root 超級用戶並退出 停止mysql服務 啟動資料庫服務 連接資料庫 查看創建的用戶的許可權 註意:以上只能在存在數據的時 ...
  • 安裝 mysql 源碼包安裝 優化基礎源 安裝依賴包 下載(或者上傳): 解壓: 確定安裝路徑存在不存在,不存在則創建 /opt/mysql-5.6.44: 生成: 編譯: 安裝: 創建用戶 拷貝啟動腳本 拷貝配置文件( 存在就覆蓋,不存在則創建 ) 創建socket文件存放目錄 製作軟連接 授權 ...
  • [Err] 1064 - You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 如果不 ...
  • [toc] 一、事務transaction (一)什麼是事務 事務是指一組操作,要麼都執行成功,要麼都執行失敗 1. start transaction:開啟事務 2. commit:提交確認 3. rollback:回滾,撤銷 (二)事務的ACID特性 (1)原子性Atomicity 指不能再分的 ...
  • [toc] 1. 資料庫是什麼 存數據的倉庫 2. 為什麼使用資料庫 1. 管理大量數據 2. 支持併發操作 3. 支持高級的操作,比如分組,鏈表等 3. 資料庫的分類 1. 關係型資料庫 表結構存儲,對每一列的數據的類型會有約束,數據存在硬碟中 Mysql(免費,企業用的多),maridb,Sql ...
  • 1.環境變數設置[oracle]$cat >> /home/oracle/.bash_profile > /home/oracle/.bashrc > $ORACLE_HOME/sqlplus/admin/glogin.sql " set time on set timing on set page... ...
  • redis沒有實現訪問控制這個功能,但是它提供了一個輕量級的認證方式,可以編輯redis.conf配置來啟用認證。 1、初始化Redis密碼: 在配置文件中有個參數: requirepass 這個就是配置redis訪問密碼的參數; 比如 requirepass 123456; (Ps:需重啟Redi ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...