經過一段時間的學習,對於Hadoop有了一些瞭解,於是決定用MapReduce實現PageRank演算法,以下簡稱PR 先簡單介紹一下PR演算法(摘自百度百科:https://baike.baidu.com/item/google%20pagerank/2465380?fr=aladdin&fromid ...
經過一段時間的學習,對於Hadoop有了一些瞭解,於是決定用MapReduce實現PageRank演算法,以下簡稱PR
先簡單介紹一下PR演算法(摘自百度百科:https://baike.baidu.com/item/google%20pagerank/2465380?fr=aladdin&fromid=111004&fromtitle=pagerank):
PageRank讓鏈接來"投票" 一個頁面的“得票數”由所有鏈向它的頁面的重要性來決定,到一個頁面的超鏈接相當於對該頁投一票。一個頁面的PageRank是由所有鏈向它的頁面(“鏈入頁面”)的重要性經過遞歸演算法得到的。一個有較多鏈入的頁面會有較高的等級,相反如果一個頁面沒有任何鏈入頁面,那麼它沒有等級。2005年初,Google為網頁鏈接推出一項新屬性nofollow,使得網站管理員和網站作者可以做出一些Google不計票的鏈接,也就是說這些鏈接不算作"投票"。nofollow的設置可以抵制評論垃圾。 假設一個由4個頁面組成的小團體:A,B,C和D。如果所有頁面都鏈向A,那麼A的PR(PageRank)值將是B,C及D的Pagerank總和。 繼續假設B也有鏈接到C,並且D也有鏈接到包括A的3個頁面。一個頁面不能投票2次。所以B給每個頁面半票。以同樣的邏輯,D投出的票只有三分之一算到了A的PageRank上。 換句話說,根據鏈出總數平分一個頁面的PR值。 最後,所有這些被換算為一個百分比再乘上一個繫數。由於“沒有向外鏈接的頁面”傳遞出去的PageRank會是0,所以,Google通過數學系統給了每個頁面一個最小值: 說明:在Sergey Brin和Lawrence Page的1998年原文中給每一個頁面設定的最小值是1-d,而不是這裡的 (1-d)/N。 所以一個頁面的PageRank是由其他頁面的PageRank計算得到。Google不斷的重覆計算每個頁面的PageRank。如果給每個頁面一個隨機PageRank值(非0),那麼經過不斷的重覆計算,這些頁面的PR值會趨向於穩定,也就是收斂的狀態。這就是搜索引擎使用它的原因。 通過以上文字,可以總結出以下幾點: 1.PR中每個頁面都需要需要一個初始值 2.PR演算法是一個趨於收斂的無限迴圈,因此需要一個條件來確定收斂完畢 一般而言收斂條件有以下三種情況:
1、每個頁面的PR值和上一次計算的PR相等
2、設定一個差值指標(0.0001)。當所有頁面和上一次計算的PR差值平均小於該標準時,則收斂。
3、設定一個百分比(99%),當99%的頁面和上一次計算的PR相等
本文將採用第二種方式來實現該演算法:
首先定義一個初始互聯網環境,如下圖所示: 轉化為文件則內容如下:A B D
B C
C A B
D B C
其中每一行的後面的頁面為第一個頁面的出鏈(A可以鏈到B和C)
由於需要統計每個頁面的入鏈頁面和出鏈數,因此需要兩個MapReduce,第一個用於統計入鏈和出鏈,第二個用於迴圈統計PR值,代碼如下:
package com.tyx.mapreduce.PageRank; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by 恆安 on 2017/11/29. */ public class RunPageRankJob { // 統計所有鏈接的入鏈 private static Map<String,String > allInLine = new HashMap<>(); // 統計所有鏈接的出鏈 private static Map<String,Integer> allOutLine = new HashMap<>(); // 統計所有鏈接的現有pagerank private static Map<String,Double> allPageRank = new HashMap<>(); // 統計所有鏈接計算後的pagerank private static Map<String ,Double> allNextPageRank = new HashMap<>(); public static void main(String[] args) { Configuration configuration = new Configuration(); // configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," "); configuration.set("fs.defaultFS", "hdfs://node1:8020"); configuration.set("yarn.resourcemanager.hostname", "node1"); // 第一個MapReduce為了統計出每個頁面的入鏈,和每個頁面的出鏈數 if (run1(configuration)){ run2(configuration); } } /* 輸入數據: A B D B C C A B D B C*/ static class AcountOutMapper extends Mapper<Text,Text,Text,Text>{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); int num = 0; // 若A能連接到B,則說明B是A的一條出鏈 String[] outLines = value.toString().split("\t"); for (int i=0;i<outLines.length;i++){ context.write(new Text(outLines[i]),key); } num = outLines.length; // 統計出鏈 context.write(key,new Text("--"+num)); } } static class AcountOutReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); // 統計該頁面的入鏈頁面 String outStr = ""; int sum = 0; for (Text text : values){ // 統計出鏈數目 if (text.toString().contains("--")){ sum += Integer.parseInt(text.toString().replaceAll("--","")); }else { outStr += text+"\t"; } } context.write(key,new Text(outStr+sum)); allOutLine.put(key.toString(),sum); allInLine.put(key.toString(),outStr); allPageRank.put(key.toString(),1.0); } } public static boolean run1(Configuration configuration){ try { Job job = Job.getInstance(configuration); FileSystem fileSystem = FileSystem.get(configuration); job.setJobName("acountline"); job.setJarByClass(RunPageRankJob.class); job.setMapperClass(AcountOutMapper.class); job.setReducerClass(AcountOutReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(KeyValueTextInputFormat.class); Path intPath = new Path("/usr/output/pagerank.txt"); FileInputFormat.addInputPath(job,intPath); Path outPath = new Path("/usr/output/acoutline"); if (fileSystem.exists(outPath)){ fileSystem.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); boolean f = job.waitForCompletion(true); return f; } catch (Exception e) { e.printStackTrace(); } return false; } /*第一次MapReduce輸出數據: A C B A C D C B D D A*/ static class PageRankMapper extends Mapper<Text,Text,Text,Text>{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); String myUrl = key.toString(); // 取出該頁面所有的入鏈頁面 String inLines = allInLine.get(myUrl); context.write(key,new Text(inLines)); } } static class PageRankReducer extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); // 後半段求和公式的和 (PR(1)/L(1)…………PR(i)/L(i) double sum = 0.0; String outStr = ""; for (Text text : values){ String[] arr = text.toString().split("\t"); for (int i=0;i<arr.length;i++){ outStr += arr[i]+"\t"; sum += allPageRank.get(arr[i])/allOutLine.get(arr[i]); } } // 算出該頁面本次的PR結果 double nowPr = (1-0.85)/allPageRank.size()+0.85*sum; allNextPageRank.put(key.toString(),nowPr); context.write(key,new Text(outStr)); } } public static void run2(Configuration configuration){ double d = 0.001; int i=1; // 迭代迴圈趨於收斂 while (true){ try { configuration.setInt("count",i); i++; Job job = Job.getInstance(configuration); FileSystem fileSystem = FileSystem.get(configuration); job.setJobName("pagerank"); job.setJarByClass(RunPageRankJob.class); job.setJobName("Pr"+i); job.setMapperClass(PageRankMapper.class); job.setReducerClass(PageRankReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(KeyValueTextInputFormat.class); Path intPath = new Path("/usr/output/pagerank.txt"); if (i>2){ intPath = new Path("/usr/output/Pr"+(i-1)); } FileInputFormat.addInputPath(job,intPath); Path outPath = new Path("/usr/output/Pr"+i); if (fileSystem.exists(outPath)){ fileSystem.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); boolean f = job.waitForCompletion(true); if (f){ System.out.println("job執行完畢"); double sum = 0.0; // 提取本輪所有頁面的PR值和上一輪作比較, for (String key : allPageRank.keySet()){ System.out.println(key+"--------------------------"+allPageRank.get(key)); sum += Math.abs(allNextPageRank.get(key)-allPageRank.get(key)); allPageRank.put(key,allNextPageRank.get(key)); } System.out.println(sum); // 若平均差小於d則表示收斂完畢 if (sum/allPageRank.size()<d){ break; } } } catch (Exception e) { e.printStackTrace(); } } } }
最終結果輸入如下:
由圖可知在這四個頁面組成的互聯網集群中,頁面C的重要性是最高的
本次操作一共經過了30次迴圈:
若有不對之處請不吝指教,謝謝