1. Aggregate Aggregate即聚合操作。直接上代碼: acc即(0,0),number即data,seqOp將data的值累加到Tuple的第一個元素,將data的個數累加到Tuple的第二個元素。由於沒有分區,所以combOp是不起作用的,這個例子裡面即使分區了,combOp起作用 ...
1. Aggregate
Aggregate即聚合操作。直接上代碼:
import org.apache.spark.{SparkConf, SparkContext} object AggregateTest { def main(args:Array[String]) = { // 設置運行環境 val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar")) val sc = new SparkContext(conf) var data = List(2,5,8,1,2,6,9,4,3,5) var res = data.par.aggregate((0,0))( // seqOp (acc, number) => (acc._1+number, acc._2+1), // combOp (par1, par2) => (par1._1+par2._1, par1._2+par2._2) ) println(res) sc.stop } }
acc即(0,0),number即data,seqOp將data的值累加到Tuple的第一個元素,將data的個數累加到Tuple的第二個元素。由於沒有分區,所以combOp是不起作用的,這個例子裡面即使分區了,combOp起作用了,結果也是一樣的。
運行結果:
(45,10)
2. AggregateByKey
AggregateByKey和Aggregate差不多,也是聚合,不過它是根據Key的值來聚合。
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2017/6/13. */ object AggregateByKeyTest { def main(args:Array[String]) = { // 設置運行環境 val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar")) val sc = new SparkContext(conf) val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)) val rdd = sc.parallelize(data) val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)( // seqOp math.max(_,_), // combOp _+_ ) res.collect.foreach(println) sc.stop } }
根據Key值的不同,可以分為3個組:
(1) (1,3),(1,2),(1,4);
(2) (2,3);
(3) (3,6),(3,8)。
這3個組分別進行seqOp,也就是(K,V)裡面的V和0進行math.max()運算,運算結果和下一個V繼續運算,以第一個組為例,運算過程是這樣的:
0, 3 => 3
3, 2 => 3
3, 4 => 4
所以最終結果是(1,4)。combOp是對把各分區的V加起來,由於這裡並沒有分區,所以實際上是不起作用的。
運行結果:
(2,3) (1,4) (3,8)
如果生成RDD時分成3個區:
val rdd = sc.parallelize(data,3)
運行結果就變成了:
(3,8) (1,7) (2,3)
這是因為一個分區返回(1,3),另一個分區返回(1,4),combOp將這兩個V加起來,就得到了(1,7)。