Spark學習筆記3:鍵值對操作

来源:http://www.cnblogs.com/caiyisen/archive/2017/09/15/7521971.html
-Advertisement-
Play Games

鍵值對RDD通常用來進行聚合計算,Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為pair RDD。pair RDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作介面。 Spark中創建pair RDD的方法:存儲鍵值對的數據格式會在讀取時直接返回由其鍵值對數據組成的pa ...


鍵值對RDD通常用來進行聚合計算,Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為pair RDD。pair RDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作介面。

Spark中創建pair RDD的方法:存儲鍵值對的數據格會在讀取時直接返回由其鍵值對數據組成的pair RDD,還可以使用map()函數將一個普通的RDD轉為pair RDD。

  • Pair RDD的轉化操作
  1. reduceByKey()  與reduce類似 ,接收一個函數,並使用該函數對值進行合併,為每個數據集中的每個鍵進行並行的歸約操作。返回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。例如 :上一章中單詞計數的例子:val counts  =  words.map(word => (word,1)).reduceByKey{ case (x,y) => x + y}
  2. foldByKey()與fold()類似,都使用一個與RDD和合併函數中的數據類型相同的零值最為初始值。val counts  =  words.map(word => (word,1)).foldByKey{ case (x,y) => x + y}
  3. combineByKey()是最為常用的基於鍵進行聚合的函數,可以返回與輸入類型不同的返回值。

  理解combineByKey處理數據流程,首先需要知道combineByKey的createCombiner()函數用來創建那個鍵對應的累加器的初始值,mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併。mergeCombiners()方法將各個分區的結果進行合併。

使用combineByKey進行單詞計數的例子:

import org.apache.spark.{SparkConf, SparkContext}

object word {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(("coffee",1),("coffee",2),("panda",3),("coffee",9)))
val counts = input.combineByKey(
(v) => (v,1),
(acc:(Int,Int) ,v) => (acc._1 + v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
)
counts.foreach(println)
}
}

 輸出結果:

 

這個例子中的數據流示意圖如下:

 

 簡單說過程就是,將輸入鍵值對數據進行分區,每個分區先根據鍵計算相應的值以及鍵出現的次數。然後對不同分區進行合併得出最後的結果。

  4.groupByKey()使用RDD中的鍵來對數據進行分組,對於一個由類型K的鍵和類型V的值組成的RDD,所得到的結果RDD類型會是[K, Iterable[V] ]

 例如:

import org.apache.spark.{SparkConf, SparkContext}

object word {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("word")
    val sc = new SparkContext(conf)
    val input = sc.parallelize(List("scala spark scala core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" ")).map(word => (word,1))
    val counts = words.groupByKey()
    counts.foreach(println)
  }
}

  輸出:

   5、cogroup函數對多個共用同一個鍵的RDD進行分組,對兩個鍵類型均為K而值類型分別為V和W的RDD進行cogroup時,得到的結果RDD類型為[(K,(Iterable[V],Iterable[W]))] 

  6、join(other)這樣的連接是內連接,只有在兩個pair RDD中都存在的鍵才輸出。若一個輸入對應的鍵有多個值時,生成的pair RDD會包括來自兩個輸入RDD的每一組相對應的記錄。理解這句話看下麵的例子:

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val joins = rdd.join(other)

  輸出結果:

  7、leftOuterJoin(other)左外連接和rightOuterJoin(other)右外連接都會根據鍵連接兩個RDD,但是允許結果中存在其中的一個pair RDD所缺失的鍵。

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val join1 = rdd.rightOuterJoin(other)

  輸出結果:

 

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val join2 = rdd.leftOuterJoin(other)

  輸出結果: 

 

  8、sortByKey()函數接收一個叫做ascending的參數,表示想要讓結果升序排序還是降序排序。

val input = sc.parallelize(List("scala spark scala core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y)=>x+y)
    val counts = words.sortByKey()

  輸出結果:

     

  •  Pair RDD的行動操作
  1. countByKey() 對每個鍵對應的元素分別計數。
  2. collectAsMap()將結果以映射表的形式返回,註意後面的value會覆蓋前面的。
    val num = sc.parallelize(List((1,2),(3,4),(3,6)))
    println(num.collectAsMap().mkString(" "))
    

    輸出結果:

  3. lookup(key)返回給定鍵對應的所有值。 
  • 數據分區

  Spark程式可以通過控制RDD分區方式來減少通信開銷。

運行下麵這段代碼,用來查看用戶查閱了自己訂閱的主題的頁面的數量,結果返回3:

val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問情況
val userData = sc.parallelize(list1)
val events = sc.parallelize(list2)
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println(results)

  上面這段代碼中,用到了join操作,會將兩個數據集中的所有鍵的哈希值都求出來,將該哈希值相同的記錄通過網路傳到同一臺機器上,然後在那台機器上對所有鍵相同的記錄進行連接操作。

  假如userdata表很大很大,而且幾乎是不怎麼變化的,那麼每次都對userdata表進行哈希值計算和跨節點的數據混洗,就會產生很多的額外開銷。

如下:

解決這一產生額外開銷的方法就是,對userdata表使用partitionBy()轉化操作,將這張表轉為哈希分區。修改後的代碼如下:

    val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
    val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問情況
    val userData = sc.parallelize(list1)
    val events = sc.parallelize(list2)
    userData.partitionBy(new DomainNamePartitioner(10)).persist()
    val joined = userData.join(events)
    val results = joined.filter({
      case (id, (info, link)) =>
        info.contains(link)
    }
    ).count()
    println(results)

  構建userData時調用了partitionBy(),在調用join()時,Spark只會對events進行數據混洗操作,將events中特定UserID的記錄發送到userData的對應分區所在的那台機器上。這樣,通過網路傳輸的數據就大大減少,程式運行速度也可以顯著提升。partitionBy()是一個轉化操作,因此它的返回值是一個新的RDD。

  新的數據處理過程如下:

  scala可以使用RDD的partitioner屬性來獲取RDD的分區方式,它會返回一個scala.Option對象。

  可以從數據分區中獲益的操作有cogroup() , groupWith() , join() , leftOuterJoin() , rightOuterJoin() , groupByKey() , reduceByKey() , combineByKey()以及lookup()。

  實現自定義分區器,需要繼承org.apache.spark.Partitioner類並實現下麵的三個方法:

  • numPartitions: Int :返回創建出來的分區數
  • getPartition(key: Any):Int : 返回給定鍵的分區編號(0 到 numPartitions - 1)
  • equals() : Java判斷相等的方法,Spark用這個方法來檢查分區器對象是否和其他分區器實例相同,這樣Spark才可以判斷兩個RDD的分區方式是否相同。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • MySqlInnoDB的事務隔離級別有四個:(預設是可重覆讀repeatable read) 未提交讀 read uncommit : 在另一個事務修改了數據,但尚未提交,在本事務中SELECT語句可能會查詢到這些未被提交的數據,而發生臟讀。 提交讀 read commit : 在一個事務中發生兩次 ...
  • ORACLE虛擬索引(Virtual Index) 虛擬索引概念 虛擬索引(Virtual Indexes)是一個定義在數據字典中的假索引(fake index),它沒有相關的索引段。虛擬索引的目的是模擬索引的存在而不用真實的創建一個完整索引。這允許開發者創建虛擬索引來查看相關執行計劃而不用等到真實... ...
  • 1. 索引的特性 1.1 加快條件的檢索的特性 當表數據量越來越大時查詢速度會下降,在表的條件欄位上使用索引,快速定位到可能滿足條件的記錄,不需要遍歷所有記錄。 #在這個案例中:執行同一條SQL。t2有索引的執行數據是0.052 ms;t1沒有索引的是:5.741 ms; 1.2 有序的特性 索引本 ...
  • 文件和文件組填充策略 文件組對組內的所有文件都使用按比例填充策略。當數據寫入文件組時,SQL Server 資料庫引擎按文件中的可用空間比例將數據寫入文件組中的每個文件,而不是將所有數據都寫入第一個文件直至其變滿為止。然後再寫入下一個文件。例如,如果文件 f1 有 100 MB 可用空間,文件 f2 ...
  • --通過代碼方式新建資料庫create database MyDatabase0911New --(MyDatabase0911New要創建的資料庫名稱)on primary --資料庫文件( --名字 name='MyDatabase0911New_data', --路徑 filename='G: ...
  • SQL是Structure Query language(結構化查詢語言)的縮寫,它是使用關係模型的資料庫應用語言。在眾多開源資料庫中,MySQL正是其中最傑出的代表,MySQL是由三個瑞典人於20世紀90年代開發的一個關係型資料庫。並用了創始人之一Michael Widenius女兒的名字My命名 ...
  • 最近做了幾個PowerBI報表,對PowerBI的設計有了更深的理解,對數據的塑形(sharp data),不僅可以在Data Source中實現,例如在TSQL查詢腳本中,而且可以在PowerBI中實現,例如,向數據模型中添加自定義欄位,或者在報表數據顯示時,根據數據表之間的關係做數據的統計。本文 ...
  • 索引對於提高查詢性能非常有效,因此,一般應該考慮應該考慮為分區表建立索引,為分區表建立索引與為普通表建立索引的語法一直,但是,其行為與普通索引有所差異。預設情況下,分區表中創建的索引使用與分區表相同分區架構和分區列,這樣,索引將於表對齊。將表與其索引對齊,可以使管理工作更容易進行,對於滑動視窗方案尤 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...