鍵值對RDD通常用來進行聚合計算,Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為pair RDD。pair RDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作介面。 Spark中創建pair RDD的方法:存儲鍵值對的數據格式會在讀取時直接返回由其鍵值對數據組成的pa ...
鍵值對RDD通常用來進行聚合計算,Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為pair RDD。pair RDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作介面。
Spark中創建pair RDD的方法:存儲鍵值對的數據格式會在讀取時直接返回由其鍵值對數據組成的pair RDD,還可以使用map()函數將一個普通的RDD轉為pair RDD。
- Pair RDD的轉化操作
- reduceByKey() 與reduce類似 ,接收一個函數,並使用該函數對值進行合併,為每個數據集中的每個鍵進行並行的歸約操作。返回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。例如 :上一章中單詞計數的例子:val counts = words.map(word => (word,1)).reduceByKey{ case (x,y) => x + y}
- foldByKey()與fold()類似,都使用一個與RDD和合併函數中的數據類型相同的零值最為初始值。val counts = words.map(word => (word,1)).foldByKey{ case (x,y) => x + y}
- combineByKey()是最為常用的基於鍵進行聚合的函數,可以返回與輸入類型不同的返回值。
理解combineByKey處理數據流程,首先需要知道combineByKey的createCombiner()函數用來創建那個鍵對應的累加器的初始值,mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併。mergeCombiners()方法將各個分區的結果進行合併。
使用combineByKey進行單詞計數的例子:
import org.apache.spark.{SparkConf, SparkContext}
object word {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(("coffee",1),("coffee",2),("panda",3),("coffee",9)))
val counts = input.combineByKey(
(v) => (v,1),
(acc:(Int,Int) ,v) => (acc._1 + v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
)
counts.foreach(println)
}
}
輸出結果:
這個例子中的數據流示意圖如下:
簡單說過程就是,將輸入鍵值對數據進行分區,每個分區先根據鍵計算相應的值以及鍵出現的次數。然後對不同分區進行合併得出最後的結果。
4.groupByKey()使用RDD中的鍵來對數據進行分組,對於一個由類型K的鍵和類型V的值組成的RDD,所得到的結果RDD類型會是[K, Iterable[V] ]
例如:
import org.apache.spark.{SparkConf, SparkContext} object word { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("word") val sc = new SparkContext(conf) val input = sc.parallelize(List("scala spark scala core scala python java spark scala")) val words = input.flatMap(line => line.split(" ")).map(word => (word,1)) val counts = words.groupByKey() counts.foreach(println) } }
輸出:
5、cogroup函數對多個共用同一個鍵的RDD進行分組,對兩個鍵類型均為K而值類型分別為V和W的RDD進行cogroup時,得到的結果RDD類型為[(K,(Iterable[V],Iterable[W]))]
6、join(other)這樣的連接是內連接,只有在兩個pair RDD中都存在的鍵才輸出。若一個輸入對應的鍵有多個值時,生成的pair RDD會包括來自兩個輸入RDD的每一組相對應的記錄。理解這句話看下麵的例子:
val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) val other = sc.parallelize(List((3,9))) val joins = rdd.join(other)
輸出結果:
7、leftOuterJoin(other)左外連接和rightOuterJoin(other)右外連接都會根據鍵連接兩個RDD,但是允許結果中存在其中的一個pair RDD所缺失的鍵。
val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) val other = sc.parallelize(List((3,9))) val join1 = rdd.rightOuterJoin(other)
輸出結果:
val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) val other = sc.parallelize(List((3,9))) val join2 = rdd.leftOuterJoin(other)
輸出結果:
8、sortByKey()函數接收一個叫做ascending的參數,表示想要讓結果升序排序還是降序排序。
val input = sc.parallelize(List("scala spark scala core scala python java spark scala")) val words = input.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y)=>x+y) val counts = words.sortByKey()
輸出結果:
- Pair RDD的行動操作
- countByKey() 對每個鍵對應的元素分別計數。
- collectAsMap()將結果以映射表的形式返回,註意後面的value會覆蓋前面的。
val num = sc.parallelize(List((1,2),(3,4),(3,6))) println(num.collectAsMap().mkString(" "))
輸出結果:
- lookup(key)返回給定鍵對應的所有值。
- 數據分區
Spark程式可以通過控制RDD分區方式來減少通信開銷。
運行下麵這段代碼,用來查看用戶查閱了自己訂閱的主題的頁面的數量,結果返回3:
val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問情況
val userData = sc.parallelize(list1)
val events = sc.parallelize(list2)
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println(results)
上面這段代碼中,用到了join操作,會將兩個數據集中的所有鍵的哈希值都求出來,將該哈希值相同的記錄通過網路傳到同一臺機器上,然後在那台機器上對所有鍵相同的記錄進行連接操作。
假如userdata表很大很大,而且幾乎是不怎麼變化的,那麼每次都對userdata表進行哈希值計算和跨節點的數據混洗,就會產生很多的額外開銷。
如下:
解決這一產生額外開銷的方法就是,對userdata表使用partitionBy()轉化操作,將這張表轉為哈希分區。修改後的代碼如下:
val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題 val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問情況 val userData = sc.parallelize(list1) val events = sc.parallelize(list2) userData.partitionBy(new DomainNamePartitioner(10)).persist() val joined = userData.join(events) val results = joined.filter({ case (id, (info, link)) => info.contains(link) } ).count() println(results)
構建userData時調用了partitionBy(),在調用join()時,Spark只會對events進行數據混洗操作,將events中特定UserID的記錄發送到userData的對應分區所在的那台機器上。這樣,通過網路傳輸的數據就大大減少,程式運行速度也可以顯著提升。partitionBy()是一個轉化操作,因此它的返回值是一個新的RDD。
新的數據處理過程如下:
scala可以使用RDD的partitioner屬性來獲取RDD的分區方式,它會返回一個scala.Option對象。
可以從數據分區中獲益的操作有cogroup() , groupWith() , join() , leftOuterJoin() , rightOuterJoin() , groupByKey() , reduceByKey() , combineByKey()以及lookup()。
實現自定義分區器,需要繼承org.apache.spark.Partitioner類並實現下麵的三個方法:
- numPartitions: Int :返回創建出來的分區數
- getPartition(key: Any):Int : 返回給定鍵的分區編號(0 到 numPartitions - 1)
- equals() : Java判斷相等的方法,Spark用這個方法來檢查分區器對象是否和其他分區器實例相同,這樣Spark才可以判斷兩個RDD的分區方式是否相同。