aggregateByKey 這個RDD有點繁瑣,整理一下使用示例,供參考 直接上代碼 輸出結果說明: 參考代碼及下麵的說明進行理解 官網的說明 源碼中函數的說明 ...
aggregateByKey 這個RDD有點繁瑣,整理一下使用示例,供參考
直接上代碼
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/10/27. */ object AggregateByKey { def main(args: Array[String]) { val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey") .setMaster("local") val sc: SparkContext = new SparkContext(sparkConf) val data = List((1, 3), (1, 2), (1, 4), (2, 3)) var rdd = sc.parallelize(data,2)//數據拆分成兩個分區 //合併在不同partition中的值,a,b的數據類型為zeroValue的數據類型 def comb(a: String, b: String): String = { println("comb: " + a + "\t " + b) a + b } //合併在同一個partition中的值, a的數據類型為zeroValue的數據類型,b的數據類型為原value的數據類型 def seq(a: String, b: Int): String = { println("seq: " + a + "\t " + b) a + b } rdd.foreach(println)
//zeroValue 中立值,定義返回value的類型,並參與運算 //seqOp 用來在一個partition中合併值的 //comb 用來在不同partition中合併值的 val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb) //列印輸出 aggregateByKeyRDD.foreach(println) sc.stop() } }
輸出結果說明:
/* 將數據拆分成兩個分區 //分區一數據 (1,3) (1,2) //分區二數據 (1,4) (2,3) //分區一相同key的數據進行合併 seq: 100 3 //(1,3)開始和中立值進行合併 合併結果為 1003 seq: 1003 2 //(1,2)再次合併 結果為 10032 //分區二相同key的數據進行合併 seq: 100 4 //(1,4) 開始和中立值進行合併 1004 seq: 100 3 //(2,3) 開始和中立值進行合併 1003 將兩個分區的結果進行合併 //key為2的,只在一個分區存在,不需要合併 (2,1003) (2,1003) //key為1的, 在兩個分區存在,並且數據類型一致,合併 comb: 10032 1004 (1,100321004) * */
參考代碼及下麵的說明進行理解
官網的說明
源碼中函數的說明
/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/