# RDD的Transformation運算元 ## map map運算元的功能為做映射,即將原來的RDD中對應的每一個元素,應用外部傳入的函數進行運算,返回一個新的RDD ```Scala val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8 ...
RDD的Transformation運算元
map
map運算元的功能為做映射,即將原來的RDD中對應的每一個元素,應用外部傳入的函數進行運算,返回一個新的RDD
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.map(_ * 2)
flatMap
flatMap運算元的功能為扁平化映射,即將原來RDD中對應的每一個元素應用外部的運算邏輯進行運算,然後再將返回的數據進行壓平,類似先map,然後再flatten的操作,最後返回一個新的RDD
val arr = Array(
"spark hive flink",
"hive hive flink",
"hive spark flink",
"hive spark flink"
)
val rdd1: RDD[String] = sc.makeRDD(arr, 2)
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))
filter
filter的功能為過濾,即將原來RDD中對應的每一個元素,應用外部傳入的過濾邏輯,然後返回一個新的的RDD
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.filter(_ % 2 == 0)
mapPartitions
將數據以分區為的形式返回進行map操作,一個分區對應一個迭代器,該方法和map方法類似,只不過該方法的參數由RDD中的每一個元素變成了RDD中每一個分區的迭代器,如果在映射的過程中需要頻繁創建額外的對象,使用mapPartitions要比map高效的過。
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 2)
var r1: RDD[Int] = rdd1.mapPartitions(it => it.map(x => x * 10))
map和mapPartitions的區別,mapPartitions一定會比map效率更高嗎?
不一定:如果對RDD中的數據進行簡單的映射操作,例如變大寫,對數據進行簡單的運算,map和mapPartitions的效果是一樣的,但是如果是使用到了外部共用的對象或資料庫連接,mapPartitions效率會更高一些。
原因:map出入的函數是一條一條的進行處理,如果使用資料庫連接,會每來一條數據創建一個連接,導致性能過低,而mapPartitions傳入的函數參數是迭代器,是以分區為單位進行操作,可以事先創建好一個連接,反覆使用,操作一個分區中的多條數據。
特別提醒:如果使用mapPartitions方法不當,即將迭代器中的數據toList,就是將數據都放到記憶體中,可能會出現記憶體溢出的情況。
mapPartitionsWithIndex
類似於mapPartitions, 不過函數要輸入兩個參數,第一個參數為分區的索引,第二個是對應分區的迭代器。函數的返回的是一個經過該函數轉換的迭代器。
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
it.map(e => s"partition: $index, val: $e")
})
keys
RDD中的數據為對偶元組類型,調用keys方法後返回一個新的的RDD,該RDD的對應的數據為原來對偶元組的全部key,該方法有隱式轉換
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val keyRDD: RDD[String] = wordAndOne.keys
values
RDD中的數據為對偶元組類型,調用values方法後返回一個新的的RDD,該RDD的對應的數據為原來對偶元組的全部values
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val valueRDD: RDD[Int] = wordAndOne.values
mapValues
RDD中的數據為對偶元組類型,將value應用傳入的函數進行運算後再與key組合成元組返回一個新的RDD
val lst = List(("spark", 5), ("hive", 3), ("hbase", 4), ("flink", 8))
val rdd1: RDD[(String, Int)] = sc.parallelize(lst, 2)
//將每一個元素的次數乘以10再可跟key組合在一起
//val rdd2 = rdd1.map(t => (t._1, t._2 * 10))
val rdd2 = rdd1.mapValues(_ * 10)
flatMapValues
RDD中的數據為對偶元組類型,將value應用傳入的函數進行flatMap打平後再與key組合成元組返回一個新的RDD
val lst = List(("spark", "1,2,3"), ("hive", "4,5"), ("hbase", "6"), ("flink", "7,8"))
val rdd1: RDD[(String, String)] = sc.parallelize(lst, 2)
//將value打平,再將打平後的每一個元素與key組合("spark", "1,2,3") =>("spark",1),("spark",2),("spark",3)
val rdd2: RDD[(String, Int)] = rdd1.flatMapValues(_.split(",").map(_.toInt))
// val rdd2 = rdd1.flatMap(t => {
// t._2.split(",").map(e => (t._1, e.toInt))
// })
uion
將兩個類型一樣的RDD合併到一起,返回一個新的RDD,新的RDD的分區數量是原來兩個RDD的分區數量之和
//兩個RDD進行union,對應的數據類型必須一樣
//Union不會去重
val rdd1 = sc.parallelize(List(1,2,3,4), 2)
val rdd2 = sc.parallelize(List(5, 6, 7, 8, 9,10), 3)
val rdd3 = rdd1.union(rdd2)
println(rdd3.partitions.length)
reduceByKey
將數據按照相同的key進行聚合,特點是先在每個分區中進行局部分組聚合,然後將每個分區聚合的結果從上游拉取到下游再進行全局分組聚合
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
combineByKey
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//調用combineByKey傳入三個函數
//val reduced = wordAndOne.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
val f1 = (x: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f1 function invoked in state: $stage, partition: $partition")
x
}
//在每個分區內,將key相同的value進行局部聚合操作
val f2 = (a: Int, b: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f2 function invoked in state: $stage, partition: $partition")
a + b
}
//第三個函數是在下游完成的
val f3 = (m: Int, n: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f3 function invoked in state: $stage, partition: $partition")
m + n
}
val reduced = wordAndOne.combineByKey(f1, f2, f3)
combineByKey要傳入三個函數:
第一個函數:在上游執行,該key在當前分區第一次出現時,對value處理的運算邏輯
第二個函數:在上游執行,當該key在當前分區再次出現時,將以前相同key的value進行運算的邏輯
第三個函數:在下游執行,將來自不同分區,相同key的數據通過網路拉取過來,然後進行全局聚合的邏輯
groupByKey
按照key進行分組,底層使用的是ShuffledRDD,mapSideCombine = false,傳入的三個函數只有前兩個被調用了,並且是在下游執行的
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//按照key進行分組
val grouped: RDD[(String, Iterable[Int])] = wordAndOne.groupByKey()
foldByKey
與reduceByKey類似,只不過是可以指定初始值,每個分區應用一次初始值,先在每個進行局部聚合,然後再全局聚合,局部聚合的邏輯與全局聚合的邏輯相同。
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//與reduceByKey類似,只不過是可以指定初始值,每個分區應用一次初始值
val reduced: RDD[(String, Int)] = wordAndOne.foldByKey(0)(_ + _)
aggregateByKey
與reduceByKey類似,並且可以指定初始值,每個分區應用一次初始值,傳入兩個函數,分別是局部聚合的計算邏輯、全局聚合的邏輯。
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//在第一個括弧中傳入初始化,第二個括弧中傳入兩個函數,分別是局部聚合的邏輯和全局聚合的邏輯
val reduced: RDD[(String, Int)] = wordAndOne.aggregateByKey(0)(_ + _, _ + _)
ShuffledRDD
reduceByKey、combineByKey、aggregateByKey、foldByKey底層都是使用的ShuffledRDD,並且mapSideCombine = true
val f1 = (x: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f1 function invoked in state: $stage, partition: $partition")
x
}
//在每個分區內,將key相同的value進行局部聚合操作
val f2 = (a: Int, b: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f2 function invoked in state: $stage, partition: $partition")
a + b
}
//第三個函數是在下游完成的
val f3 = (m: Int, n: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f3 function invoked in state: $stage, partition: $partition")
m + n
}
//指定分區器為HashPartitioner
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val shuffledRDD = new ShuffledRDD[String, Int, Int](wordAndOne, partitioner)
//設置聚合親器並關聯三個函數
val aggregator = new Aggregator[String, Int, Int](f1, f2, f3)
shuffledRDD.setAggregator(aggregator) //設置聚合器
shuffledRDD.setMapSideCombine(true) //設置map端聚合
如果設置了setMapSideCombine(true),那麼聚合器中的三個函數都會執行,前兩個在上游執行,第三個在下游執行
如果設置了setMapSideCombine(false),那麼聚合器中的三個函數只會執行前兩個,並且這兩個函數都是在下游執行
distinct
distinct是對RDD中的元素進行取重,底層使用的是reduceByKey實現的,先局部去重,然後再全局去重
val arr = Array(
"spark", "hive", "spark", "flink",
"spark", "hive", "hive", "flink",
"flink", "flink", "flink", "spark"
)
val rdd1: RDD[String] = sc.parallelize(arr, 3)
//去重
val rdd2: RDD[String] = rdd1.distinct()
distinct的底層實現如下:
Scala
val rdd11: RDD[(String, Null)] = rdd1.map((_, null))
val rdd12: RDD[String] = rdd11.reduceByKey((a, _) => a).keys
partitionBy
按照指的的分區器進行分區,底層使用的是ShuffledRDD
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的分區進行分區
val partitioned: RDD[(String, Int)] = wordAndOne.partitionBy(partitioner)
repartitionAndSortWithinPartitions
按照值的分區器進行分區,並且將數據按照指的的排序規則在分區內排序,底層使用的是ShuffledRDD,設置了指定的分區器和排序規則
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過並行化的方式創建RDD,分區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的分區進行分區,並且將數據按照指定的排序規則在分區內排序
val partitioned = wordAndOne.repartitionAndSortWithinPartitions(partitioner)
repartitionAndSortWithinPartitions的底層實現:
Scala
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
sortBy
val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//將單詞和1組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分組聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照單詞出現的次數,從高到低進行排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
sortByKey
按照指的的key排序規則進行全局排序
val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//將單詞和1組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分組聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照單詞出現的次數,從高到低進行排序
//val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
//val keyed: RDD[(Int, (String, Int))] = reduced.keyBy(_._2).sortByKey()
val sorted = reduced.map(t => (t._2, t)).sortByKey(false)
sortBy、sortByKey是Transformation,但是為什麼會生成job?
因為sortBy、sortByKey需要實現全局排序,使用的是RangePartitioner,在構建RangePartitioner時,會對數據進行採樣,所有會觸發Action,根據採樣的結果來構建RangePartitioner。
RangePartitioner可以保證數據按照一定的範圍全局有序,同時在shuffle的同時,有設置了setKeyOrdering,這樣就又可以保證數據在每個分區內有序了!
reparation
reparation的功能是重新分區,一定會shuffle,即將數據打散。reparation的功能是改變分區數量(可以增大、減少、不變)可以將數據相對均勻的重新分區,可以改善數據傾斜的問題
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//repartition方法一定shuffle
//不論將分區數量變多、變少、或不變,都shuffle
val rdd2 = rdd1.repartition(3)
reparation的底層調用的是coalesce,shuffle = true
coalesce(numPartitions, shuffle = true)
coalesce
coalesce可以shuffle,也可以不shuffle,如果將分區數量減少,並且shuffle = false,就是將分區進行合併
- shuffle = true
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//shuffle = true
val rdd2 = rdd1.coalesce(3, true)
//與repartition(3)功能一樣
- shuffle = false
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
//shuffle = false
val rdd2 = rdd1.coalesce(2, false)
cogroup
協同分組,即將多個RDD中對應的數據,使用相同的分區器(HashPartitioner),將來自多個RDD中的key相同的數據通過網路傳入到同一臺機器的同一個分區中(與groupByKey、groupBy區別是,groupByKey、groupBy只能對一個RDD進行分組)
註意:調用cogroup方法,兩個RDD中對應的數據都必須是對偶元組類型,並且key類型一定相同
//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2), ("jerry", 4)), 3)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//將兩個RDD都進行分組
val grouped: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
join
兩個RDD進行join,相當於SQL中的內關聯join
兩個RDD為什麼要進行jion?想要的數據來自於兩個數據集,並且兩個數據集的數據存在相同的條件,必須關聯起來才能得到想要的全部數據
//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Double))] = rdd1.join(rdd2)
leftOuterJoin
左外連接,相當於SQL中的左外關聯
//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
rightOuterJoin
右外連接,相當於SQL中的右外關聯
//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
fullOuterJoin
全連接,相當於SQL中的全關聯
//通過並行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過並行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
intersection
求交集,底層使用的是cogroup實現的
val rdd1 = sc.parallelize(List(1,2,3,4,4,6), 2)
val rdd2 = sc.parallelize(List(3,4,5,6,7,8), 2)
//求交集
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//使用cogroup實現intersection的功能
val rdd11 = rdd1.map((_, null))
val rdd22 = rdd2.map((_, null))
val rdd33: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd11.cogroup(rdd22)
val rdd44: RDD[Int] = rdd33.filter { case (_, (it1, it2)) => it1.nonEmpty && it2.nonEmpty }.keys
subtract
求兩個RDD的差集,將第一個RDD中的數據,如果在第二個RDD中出現了,就從第一個RDD中移除
val rdd1 = sc.parallelize(List("A", "B", "C", "D", "E"))
val rdd2 = sc.parallelize(List("A", "B"))
val rdd3: RDD[String] = rdd1.subtract(rdd2)
//返回 C D E
cartesian
笛卡爾積
val rdd1 = sc.parallelize(List("tom", "jerry"), 2)
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"), 3)
val rdd3 = rdd1.cartesian(rdd2)