Spark踩坑記:共用變數

来源:http://www.cnblogs.com/liuliliuli2017/archive/2017/04/28/6782687.html
-Advertisement-
Play Games

收錄待用,修改轉載已取得 "騰訊雲" 授權 前言 前面總結的幾篇spark踩坑博文中,我總結了自己在使用spark過程當中踩過的一些坑和經驗。我們知道Spark是多機器集群部署的,分為Driver/Master/Worker,Master負責資源調度,Worker是不同的運算節點,由Master統一 ...


收錄待用,修改轉載已取得騰訊雲授權


前言

前面總結的幾篇spark踩坑博文中,我總結了自己在使用spark過程當中踩過的一些坑和經驗。我們知道Spark是多機器集群部署的,分為Driver/Master/Worker,Master負責資源調度,Worker是不同的運算節點,由Master統一調度。

而Driver是我們提交Spark程式的節點,並且所有的reduce類型的操作都會彙總到Driver節點進行整合。節點之間會將map/reduce等操作函數傳遞一個獨立副本到每一個節點,這些變數也會複製到每台機器上,而節點之間的運算是相互獨立的,變數的更新並不會傳遞迴Driver程式。

那麼有個問題,如果我們想在節點之間共用一份變數,比如一份公共的配置項,該怎麼辦呢?Spark為我們提供了兩種特定的共用變數,來完成節點間變數的共用。 本文首先簡單的介紹spark以及spark streaming中累加器和廣播變數的使用方式,然後重點介紹一下如何更新廣播變數。

累加器

顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變數,因此它能夠高效的應用於並行操作中。它們能夠用來實現counters和sums。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型,在2.0.0之前的版本中,通過繼承AccumulatorParam來實現,而2.0.0之後的版本需要繼承AccumulatorV2來實現自定義類型的累加器。

如果創建了一個具名的累加器,它可以在spark的UI中顯示。這對於理解運行階段(running stages)的過程有很重要的作用。如下圖:

在2.0.0之前版本中,累加器的聲明使用方式如下:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

累加器的聲明在2.0.0發生了變化,到2.1.0也有所變化,具體可以參考官方文檔,我們這裡以2.1.0為例將代碼貼一下:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

廣播變數

累加器比較簡單直觀,如果我們需要在spark中進行一些全局統計就可以使用它。但是有時候僅僅一個累加器並不能滿足我們的需求,比如資料庫中一份公共配置表格,需要同步給各個節點進行查詢。OK先來簡單介紹下spark中的廣播變數:

廣播變數允許程式員緩存一個只讀的變數在每台機器上面,而不是每個任務保存一份拷貝。例如,利用廣播變數,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。Spark也嘗試著利用有效的廣播演算法去分配廣播變數,以減少通信的成本。

一個廣播變數可以通過調用SparkContext.broadcast(v)方法從一個初始變數v中創建。廣播變數是v的一個包裝變數,它的值可以通過value方法訪問,下麵的代碼說明瞭這個過程:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

從上文我們可以看出廣播變數的聲明很簡單,調用broadcast就能搞定,並且scala中一切可序列化的對象都是可以進行廣播的,這就給了我們很大的想象空間,可以利用廣播變數將一些經常訪問的大變數進行廣播,而不是每個任務保存一份,這樣可以減少資源上的浪費。

更新廣播變數(rebroadcast)

廣播變數可以用來更新一些大的配置變數,比如資料庫中的一張表格,那麼有這樣一個問題,如果資料庫當中的配置表格進行了更新,我們需要重新廣播變數該怎麼做呢。上文對廣播變數的說明中,我們知道廣播變數是只讀的,也就是說廣播出去的變數沒法再修改,那麼我們應該怎麼解決這個問題呢?
答案是利用spark中的unpersist函數

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

上文是從spark官方文檔摘抄出來的,我們可以看出,正常來說每個節點的數據是不需要我們操心的,spark會自動按照LRU規則將老數據刪除,如果需要手動刪除可以調用unpersist函數。

那麼更新廣播變數的基本思路:將老的廣播變數刪除(unpersist),然後重新廣播一遍新的廣播變數,為此簡單包裝了一個用於廣播和更新廣播變數的wraper類,如下:

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class BroadcastWrapper[T: ClassTag](
    @transient private val ssc: StreamingContext,
    @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {
    // 刪除RDD是否需要鎖定
    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

利用該wrapper更新廣播變數,大致的處理邏輯如下:

// 定義
val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue)

yourStream.transform(rdd => {
  //定期更新廣播變數
  if (System.currentTimeMillis - someTime > Conf.updateFreq) {
    yourBroadcast.update(newValue, true)
  }
  // do something else
})

總結

spark中的共用變數是我們能夠在全局做出一些操作,比如record總數的統計更新,一些大變數配置項的廣播等等。而對於廣播變數,我們也可以監控資料庫中的變化,做到定時的重新廣播新的數據表配置情況,另外我使用上述方式,在每天千萬級的數據實時流統計中表現穩定,所以有相似問題的同學也可以進行嘗試,有任何問題,歡迎隨時騷擾溝通^v^

廣告下我們項目:專註於游戲輿情的挖掘分析,歡迎大家來踩踩

http://wetest.qq.com/bee/

參考文獻

Spark Programming Guide2.1.0
Spark Programming Guide1.6.3
共用變數
How can I update a broadcast variable in spark streaming?


原文鏈接:https://www.qcloud.com/community/article/407582


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 給 iOS 應用添加推送功能是一件比較麻煩的事情,本篇文章收集了集成 jpush react native 的常見問題,目的是為了幫助用戶更好地排查問題 1. 收不到推送 確保是在真機上測試,而不是在模擬器 自己的應用已經在 Apple developer 給應用配置推送功能,創建推送證書 (並且保 ...
  • 原文首發我的主力博客 http://anforen.com/wp/2017/04/android_ksoap2_unexpected_type_position_end_document_null_java_io_inputstreamreader/ jpg圖片文件較小時,比如200KB左右,一般不 ...
  • 找了好幾個都不能使用 試了一下這個 可以使用 放地址:http://www.orsoon.com/Mac/85386.html ...
  • 空指針異常: 04-27 01:13:57.270: E/AndroidRuntime(4942): FATAL EXCEPTION: main04-27 01:13:57.270: E/AndroidRuntime(4942): Process: com.itheima.mobilesafe74, ...
  • 轉載請註明出處: http://www.cnblogs.com/cnwutianhao/p/6772759.html MVP架構模式 大家都不陌生,Google 也給出過相應的參考 Sample, 但是有的人會有疑問為啥 GitHub 上面大神寫的 MVP架構模式 和 Google 的不太一樣。 G ...
  • 獲取新的設備UDID "蒲公英獲取UDID" 新設備中打開以上網址,按照安裝提示即可快速獲取UDID 開發者賬號中註冊新設備 更新項目相關的描述文件 確認之後重新下載描述文件。 確保xcode安裝(雙擊)的本地的描述文件是最新的 為了確保我們現在使用的描述文件與以前的不存在衝突,所以需要把舊的與應用 ...
  • 1. View 的getDrawingCache方法 有時候需要將某個view的內容以圖片的方式保存下來,感覺就和截圖差不多,可以使用View 的getDrawingCache方法,返回一個Bitmap對象。 2. View的getDrawingCache的具體實現 查看View的getDrawin ...
  • 主要涉及兩個技術點:1、圖標加灰色過濾;2、Android的圖片資源預設是靜態的,單實例;如果兩個IM好友的頭像一樣,最簡單的都是用的軟體自帶頭像,有一個線上,一個離線,直接改變頭像的灰度,則兩個用戶的頭像都會變灰或者線上,答案是:Drawable.mutate()。代碼如下: ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...