7. 圖的聚合操作 圖的聚合操作主要的方法有: (1) Graph.mapReduceTriplets():該方法有一個mapFunc和一個reduceFunc,mapFunc對圖中的每一個EdgeTriplet進行處理,生成一個或者多個消息,並且將這些消息發送個Edge的一個或者兩個頂點,redu ...
7. 圖的聚合操作
圖的聚合操作主要的方法有:
(1) Graph.mapReduceTriplets():該方法有一個mapFunc和一個reduceFunc,mapFunc對圖中的每一個EdgeTriplet進行處理,生成一個或者多個消息,並且將這些消息發送個Edge的一個或者兩個頂點,reduceFunc對發送到每一個頂點上的消息進行合併,生成最終的消息,最後返回一個VertexRDD(不包括沒有收到消息的頂點);
(2) Graph.pregel():該方法採用BSP模型,包括三個函數vprog、sendMsg和mergeMsg,vprog是運行在每個節點上的頂點更新函數,接收消息,然後對頂點屬性更新,sendMsg生成發送給下一次迭代的消息,mergeMsg對同一個頂點接收到的多個消息進行合併,迭代一直進行到收斂,或者達到了設置的最大迭代次數為止。
代碼:
// 聚合操作 println("*************************************************************") println("聚合操作") println("*************************************************************") println("找出年紀最大的追求者:") val oldestFollower:VertexRDD[(String,Int)] = userGraph.mapReduceTriplets[(String,Int)]( // 將源頂點的屬性發送給目標頂點,map過程 edge => Iterator((edge.dstId,(edge.srcAttr.name,edge.srcAttr.age))), // 得到最大追求者,reduce過程 (a,b) => if(a._2>b._2) a else b ) userGraph.vertices.leftJoin(oldestFollower){(id,user,optOldestFollower) => optOldestFollower match{ case None => s"${user.name} does not have any followers." case Some(oldestAge) => s"The oldest age of ${user.name} \'s followers is ${oldestAge._2}(${oldestAge._1})." } }.collect.foreach{case(id,str) => println(str)} println // 找出追求者的平均年齡 println("找出追求者的平均年齡:") val averageAge:VertexRDD[Double] = userGraph.mapReduceTriplets[(Int,Double)]( // 將源頂點的屬性(1,Age)發送給目標頂點,map過程 edge => Iterator((edge.dstId,(1,edge.srcAttr.age.toDouble))), // 得到追求者的數量和總年齡 (a,b) => ((a._1+b._1),(a._2+b._2)) ).mapValues((id,p) => p._2/p._1) userGraph.vertices.leftJoin(averageAge){(id,user,optAverageAge) => optAverageAge match{ case None => s"${user.name} does not have any followers." case Some(avgAge) => s"The average age of ${user.name} \'s followers is $avgAge." } }.collect.foreach{case(id,str) => println(str)} println // 聚合操作2 println("*************************************************************") println("聚合操作2") println("*************************************************************") println("找出3到各頂點的最短距離:") // 定義源點 val sourceId:VertexId = 3L val initialGraph = graph.mapVertices((id,_) => if(id==sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( (id,dist,newDist) => math.min(dist,newDist), // 權重計算 triplet=>{ if(triplet.srcAttr + triplet.attr < triplet.dstAttr){ Iterator((triplet.dstId, triplet.srcAttr+triplet.attr)) } else{ Iterator.empty } }, // 最短距離 (a,b) => math.min(a,b) ) println(sssp.vertices.collect.mkString("\n"))
運行結果:
*************************************************************
聚合操作
*************************************************************
找出年紀最大的追求者:
The oldest age of Peter 's followers is 27(Henry).
The oldest age of Kate 's followers is 55(Charlie).
The oldest age of Henry 's followers is 55(Charlie).
The oldest age of Alice 's followers is 32(Peter).
The oldest age of Charlie 's followers is 35(Mike).
Mike does not have any followers.
找出追求者的平均年齡:
The average age of Peter 's followers is 27.0.
The average age of Kate 's followers is 45.0.
The average age of Henry 's followers is 45.0.
The average age of Alice 's followers is 29.5.
The average age of Charlie 's followers is 35.0.
Mike does not have any followers.
*************************************************************
聚合操作2
*************************************************************
找出3到各頂點的最短距離:
(4,9.0)
(6,3.0)
(2,7.0)
(1,10.0)
(3,0.0)
(5,Infinity)