Spark Streaming數據限流簡述

来源:https://www.cnblogs.com/softlin/archive/2020/01/19/12215446.html
-Advertisement-
Play Games

  Spark Streaming對實時數據流進行分析處理,源源不斷的從數據源接收數據切割成一個個時間間隔進行處理;    流處理與批處理有明顯區別,批處理中的數據有明顯的邊界、數據規模已知;而流處理數據流並沒有邊界,也未知數據規模;    ...


  Spark Streaming對實時數據流進行分析處理,源源不斷的從數據源接收數據切割成一個個時間間隔進行處理;
  流處理與批處理有明顯區別,批處理中的數據有明顯的邊界、數據規模已知;而流處理數據流並沒有邊界,也未知數據規模;
  由於流處理的數據流特征,使之數據流具有不可預測性,而且數據處理的速率還與硬體、網路等資源有關,在這種情況下如不對源源不斷進來的數據流速率進行限制,那當Spark節點故障、網路故障或數據處理吞吐量下來時還有數據不斷流進來,那將有可能將出現OOM進而導致Spark Streaming程式崩潰;
  在Spark Streaming中不同的數據源採用不同的限速策略,但無論是Socket數據源的限流策略還是Kafka數據源的限流策略其速率(rate)的計算都是使用PIDController演算法進行計算而得來;
  下麵從源碼的角度分別介紹Socket數據源Kafka數據源的限流處理。

速率限制的計算與更新

  Spark Streaming的流處理其實是基於微批處理(MicroBatch)的,也就是說將數據流按某比較小的時間間隔將數據切割成為一段段微批數據進行處理;

  StreamingContext調用Start()啟動的時候會將速率控制器(rateController)添加到StreamingListener監聽器中;
  當每批次處理完成時將觸發監聽器(RateController),使用該批處理的處理結束時間、處理延遲時間、調度延遲時間、記錄行數調用PIDRateEstimator傳入PID演算法中(PID Controller)計算出該批次的速率(rate)並更新速率限制(rateLimit)與發佈該限制速率;

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo

for {
  processingEnd <- batchCompleted.batchInfo.processingEndTime
  workDelay <- batchCompleted.batchInfo.processingDelay
  waitDelay <- batchCompleted.batchInfo.schedulingDelay
  elems <- elements.get(streamUID).map(_.numRecords)
 } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
  val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
  newRate.foreach { s =>
    rateLimit.set(s.toLong)
    publish(getLatestRate())
  }
}

Socket數據源限流

  批次的限制速率上面已經算出,這裡說的是接收Socket過來的數據時的數據限流;
  SocketInputStream類receive方法接收到數據後將數據存入 BlockGenerator的Buffer中,在寫入Buffer前調用限流器 (RateLimiter)對寫入數據進行限流;
  RateLimiter限流器使用了Google開源的 Guava中內置的RateLimiter限流器,該類只是對Guava限流器的簡單封裝;
  在Spark Streaming中可通過使用兩個參數配置初始速率與最大速率spark.streaming.receiver.maxRate、spark.streaming.backpressure.initialRate;亦可配置PIDController演算法相關的四個參數值;
  RateLimiter限流器是基於令牌桶的演算法基本原理比較簡單,以一個恆定的速率生成令牌放入令牌桶中,桶滿則停止,處理請求時需要從令牌桶中取出令牌,當桶中無令牌可取時阻塞等待,此演算法用於確保系統不被洪峰擊垮。

  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
/**
  * Push a single data item into the buffer.
 */
 def addData(data: Any): Unit = {
  if (state == Active) {
//調用限流器等待
  waitToPush()
  synchronized {
    if (state == Active) {
      currentBuffer += data
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }
} else {
  throw new SparkException(
    "Cannot add data as BlockGenerator has not been started or has been stopped")
 }
}

def waitToPush() {
   //限流器申請令牌
   rateLimiter.acquire()
}

Guava庫中RateLimiter限流器基本使用:

//創建限流器,每秒產生令牌數1
    RateLimiter rateLimiter=RateLimiter.create(1);
    for (int i = 0; i < 10; i++) {
        //獲得一個令牌,未申請到令牌則阻塞等待
        double waitTime = rateLimiter.acquire();
        System.out.println(String.format("id:%d time:%d waitTime:%f",i,System.currentTimeMillis(),waitTime));
    }

Kafka數據源限流的實現

  在Spark Streaming Kafka包拉取Kafka數據會進行如下動作:
  1、取Kafka中最新偏移量、分區
  2、通過rateController限制每個分區可拉取的最大消息數
  3、在DirectKafkaInputDStream中創建KafkaRDD,在其中調用相關對象拉取數據

  通過如上步驟也可用看出,只要限制了Kafka某個分區的偏移量(offset)範圍也就可限制從Kafka拉取的消息數量,從而達到限流的目的,Spark streaming kafka也是通過此實現的;

計算每個分區速率限制,有如下步驟:
  1、通過seekToEnd獲取最新可用偏移量與當前偏移量對比獲得當前所有分區延遲偏移量
  單個分區偏移量延遲=最新偏移量記錄-當前偏移量記錄
  2、獲取配置項中每個分區最大速率
(spark.streaming.kafka.maxRatePerPartition),背壓率計算,計算每個分區背壓率計算公式為:
  單個分區背壓率=單個分區偏移量延遲/所有分區總延遲*速率限制
  速率限制(rateLimit):為通過PIDController動態計算得來

  如有配置每個分區最大速率則取配置項最大速率與背壓率兩者中的最小值未配置則取背壓率作為每個分區速率限制;

  3、將批次間隔(batchDuration)*每個分區速率限制=每個分區最大消息數
  4、取當前分區偏移量+分區最大消息數 與 最新偏移量兩者當中最小的,由此來控制拉取消息速率;

  如當前偏移量+分區最大消息數 大於 最新偏移量則取 最新偏移量否則取 當前偏移量+分區最大消息數作為拉取Kafka數據的Offset範圍

// 限制每個分區最大消息數
protected def clamp(
offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {

maxMessagesPerPartition(offsets).map { mmp =>
  mmp.map { case (tp, messages) =>
      val uo = offsets(tp)
      tp -> Math.min(currentOffsets(tp) + messages, uo)
  }
}.getOrElse(offsets)
}

  不管是Kafka數據源還是Socket數據源Spark Streaming中都使用了PIDController演算法用於計算其速率限制值,兩者的差別也只是因為兩種數據源的獲取方式數據特征而決定的。Socket數據源使用了Guava RateLimiter、而Kafka數據源自己實現了基於Offsets的限流
  以上說介紹的框架版本為:Spark Streaming 版本為2.3.2與spark-streaming-kafka-0-10_2.11;

參考資料:
http://kafka.apache.org
http://spark.apache.org

文章首發地址:Solinx
https://mp.weixin.qq.com/s/yHStZgTAGBPoOMpj4e27Jg


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本文是針對我的工具 "藍奏雲批量下載工具" 的補充說明筆記,準備按照流程整理我實現軟體的思路與方法。 涉及知識 Java的IO流 Java的下載文件 HtmlUnit的使用方法 okhttp的使用 分析與軟體思路 在某一天,我找到了一部電子書的資源,但是,該藍奏雲地址是一個文件夾,由於藍奏雲不支持批 ...
  • Java 泛型通配符 ? extends super 的用法 示例 1 : ? extends ArrayList heroList 表示這是一個Hero泛型或者其子類泛型 heroList 的泛型可能是Hero heroList 的泛型可能是APHero heroList 的泛型可能是ADHero ...
  • 1. 欄位查詢 通過模型類.objects屬性可以調用如下函數,實現對模型類對應的數據表的查詢。 函數名 功能 返回值 說明 get 返回表中滿足條件的一條且只能有一條數據。 返回值是一個模型類對象。 參數中寫查詢條件。 1)如果查到多條數據,則拋異常MultipleObjectsReturned。 ...
  • 寫在前面 首先,祝大家新年快樂,在國人的情結里,現在才是真正的年底,估計現在好多朋友已經陸續回家過節去了,祝回家的路順風。過年回去就好好陪伴家人,不要看一些毒瘤號寫的,過年時間彎道超越別人,趁著假期學習什麼各種新知識,這完全瞎扯,這一年的時間該努力,努力了,吃苦也吃了,春節就好好陪伴家人吃喝玩樂。 ...
  • Python的駐留機制及為在同一運行空間內,當兩變數的值相同,則地址也相同。 舉例: 以上示例為駐留機制有效的情況下的記憶體地址。 以下為加入非數字、字母、下劃線內容的駐留機制驗證,結果為無效 ...
  • 在經常性讀取大量的數值文件時(比如深度學習訓練數據),可以考慮現將數據存儲為Numpy格式,然後直接使用Numpy去讀取,速度相比為轉化前快很多. 下麵就常用的保存數據到二進位文件和保存數據到文本文件進行介紹: 1.保存為二進位文件(.npy/.npz) numpy.save 保存一個數組到一個二進 ...
  • list list(列表)是Python內置的一種數據類型,它是一種有序、可變的集合,可以隨時添加和刪除其中的元素。 變數classmates就是一個list。關於list的操作如下: 方法 len():獲取list元素的個數 通過索引訪問 append:向list末尾追加元素 insert:把元素 ...
  • @ "TOC" JDK1.7:數組+鏈表 JDK1.8:數組+鏈表+紅黑樹 前五個問題環境用的是是JDK1.7,後面全部是1.8 1、Hash的計算規則? 簡單的說是個“擾動函數”,目的是為了使散列分佈的更加均勻。 具體演算法是用key的Hashcode值右移16位,將hashcode高位和低位的值進 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...