import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext} object Transformation { def main(args: Array[String]): U ...
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Transformation { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Transformation") val sc = new SparkContext(config) val listRDD = sc.makeRDD(1 to 10) val listRDD2 = sc.makeRDD(Array(List(1, 2), List(3, 4))) val listRDD3 = sc.makeRDD(5 to 14) /***************************單value*****************************/ /** * map(func) * 每次處理1條數據 */ // val mapRDD = listRDD.map(_ * 2) /** * mapPartitions(func) * 每次處理一組分區數據,效率高,但可能出現記憶體溢出(因為處理完一組分區後再釋放) */ // val mapPartitionsRDD = listRDD.mapPartitions(datas=>{ // datas.map(data => data * 2) // }) /** * mapPartitionsWithIndex(func) * 函數的輸入多了分區號 */ // val tupleRDD: RDD[(Int, String)] = listRDD.mapPartitionsWithIndex { // case (num, datas) => { // datas.map((_, " 分區號:" + num)) // } // } /** * flatMap(func) * 將map後的數據扁平 */ // val flatMAPRDD: RDD[Int] = listRDD2.flatMap(datas => datas) /** * glom() * 將一個分區的數據放在一個數組裡 */ // val glomRDD: RDD[Array[Int]] = listRDD.glom() /** * groupBy(func) * 按照函數的返回值進行分組,分組後的數據(K:分組的key,V:分組的集合) */ // val groupByRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(i => i%2) // groupByRDD.collect().foreach(println) /** * filter(func) * 按照返回值為true的過濾 */ // val filterRDD: RDD[Int] = listRDD.filter(x => x % 2 ==0) // filterRDD.collect().foreach(println) /** * sample(withReplacement : scala.Boolean, fraction : scala.Double, seed : scala.Long) * 隨機抽樣 */ // val sampleRDD: RDD[Int] = listRDD.sample(false, 0.4, 1) // sampleRDD.collect().foreach(println) /** * distinct() * 去重,且去重後會shuffler,可以指定去重後的分區數 */ // val distinctRDD: RDD[Int] = listRDD.distinct() // distinctRDD.collect().foreach(println) /** * coalesce(n) * 縮減分區的數量,可以簡單的理解為合併分區,預設,沒有shuffler,可以加參數true指定shuffler */ // println("縮減分區前 = " + listRDD.partitions.size) // val coalesceRDD: RDD[Int] = listRDD.coalesce(2) // println("縮減分區前 = " + coalesceRDD.partitions.size) /** * repartition() * 重新分區,有shuffler。它其實就是帶true的coalesce */ // listRDD.glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) // val repartitionRDD: RDD[Int] = listRDD.repartition(2) // repartitionRDD.glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) /** * sortBy(f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)) * 根據函數排序 */ // val sortByRDD: RDD[Int] = listRDD.sortBy(n => n % 2, false) // sortByRDD.collect().foreach(println) /**************************雙value交互*****************************/ /** * 雙value交互 * A.union(B) 對A、B合併。(不去重) * A.subtract(B) 對A減去和B中的相同的 * A.cartesian(B) 對A、B求笛卡爾乘積 * A.zip(B) 將A、B組成(k,v),個數、分區數要相等 * A.union(B) 對A、B求並集 */ // listRDD.union(listRDD3).collect().foreach(println) // listRDD.subtract(listRDD3).collect().foreach(println) // listRDD.intersection(listRDD3).collect().foreach(println) // listRDD.cartesian(listRDD3).collect().foreach(println) // listRDD.zip(listRDD3).collect().foreach(println) /**************************(k,v)對*******************************/ val pairRDD1: RDD[(Int, String)] = sc.parallelize(Array((1, "aa"), (1, "bb"), (3, "cc"), (3, "dd")), 4) val pairRDD2: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2) val pairRDD3: RDD[(Int, String)] = sc.parallelize(Array((1, "zzz"), (3, "xxx"))) /** * partitionBy(partitioner: Partitioner) * 按照分區器進行分區 */ // pairRDD1.partitionBy(new org.apache.spark.HashPartitioner(2)) // .glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) // pairRDD1.partitionBy(new MyPartitioner(3)) // .glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) /** * groupByKey() * 單純把key相等的value放在一起,生成序列 */ // pairRDD1.groupByKey().collect().foreach(println) /** * reduceByKey(func) * 按key聚合,並且按函數對key相等的value進行操作 */ // pairRDD1.reduceByKey(_ + _) // .glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) /** * aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U) * zeroValue:每個分區的每一個key的初始值 * seqOp:每個分區里的聚合函數 * seqOp:分區間的聚合函數 */ // 取出每個分區相同對key的最大值,在相加 // pairRDD2.aggregateByKey(0)(math.max(_,_), _+_) // .glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) /** * foldByKey(zeroValue: V)(func: (V, V) => V) * 其實就是aggregateByKey的簡化版,seqOp和seqOp相同 */ // pairRDD2.foldByKey(0)(_ + _) // .glom().collect().foreach(arrays => { // println(arrays.mkString(",")) // }) /** * combineByKey[C]( * createCombiner: V => C, * mergeValue: (C, V) => C, * mergeCombiners: (C, C) => C, * partitioner: Partitioner, * mapSideCombine: Boolean = true, * serializer: Serializer = null) * * 主要就是比aggregateByKey多了一個createCombiner,用於計算初始值 */ // 計算相同key的value的均值 // pairRDD2.combineByKey( // (_, 1), // (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1), // (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) // .map{case (key, value) => (key, value._1 / value._2.toDouble)} // .collect().foreach(println) /** * sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) * 按key排序 */ // pairRDD1.sortByKey(true) // .collect().foreach(println) /** * mapValues(func) * 只對value做轉換 */ // pairRDD1.mapValues(value => value + "|||") // .collect().foreach(println) /** * A.join(B, numP) * 把key相同的value組合在一起(性能較低) */ // pairRDD1.join(pairRDD3) // .collect().foreach(println) /** * A.cogroup(B) * (k, v1) 和 (k, v2)cogroup 後,得到(k, v1集合,v2集合) */ pairRDD1.cogroup(pairRDD3) .collect().foreach(println) sc.stop() } } // 自定義分區器 class MyPartitioner (partitions: Int) extends Partitioner { override def numPartitions: Int = { partitions } override def getPartition(key: Any): Int = { 1 } }
//只寫代碼不讓我發出來--忽略這一行