Hadoop實戰訓練————MapReduce實現PageRank演算法

来源:http://www.cnblogs.com/liuxiaopang/archive/2017/11/30/7930508.html
-Advertisement-
Play Games

經過一段時間的學習,對於Hadoop有了一些瞭解,於是決定用MapReduce實現PageRank演算法,以下簡稱PR 先簡單介紹一下PR演算法(摘自百度百科:https://baike.baidu.com/item/google%20pagerank/2465380?fr=aladdin&fromid ...


經過一段時間的學習,對於Hadoop有了一些瞭解,於是決定用MapReduce實現PageRank演算法,以下簡稱PR

先簡單介紹一下PR演算法(摘自百度百科:https://baike.baidu.com/item/google%20pagerank/2465380?fr=aladdin&fromid=111004&fromtitle=pagerank):

PageRank讓鏈接來"投票" 一個頁面的“得票數”由所有鏈向它的頁面的重要性來決定,到一個頁面的超鏈接相當於對該頁投一票。一個頁面的PageRank是由所有鏈向它的頁面(“鏈入頁面”)的重要性經過遞歸演算法得到的。一個有較多鏈入的頁面會有較高的等級,相反如果一個頁面沒有任何鏈入頁面,那麼它沒有等級。
  2005年初,Google為網頁鏈接推出一項新屬性nofollow,使得網站管理員和網站作者可以做出一些Google不計票的鏈接,也就是說這些鏈接不算作"投票"。nofollow的設置可以抵制評論垃圾。 假設一個由4個頁面組成的小團體:A,B,C和D。如果所有頁面都鏈向A,那麼A的PR(PageRank)值將是B,C及D的Pagerank總和。 繼續假設B也有鏈接到C,並且D也有鏈接到包括A的3個頁面。一個頁面不能投票2次。所以B給每個頁面半票。以同樣的邏輯,D投出的票只有三分之一算到了A的PageRank上。 換句話說,根據鏈出總數平分一個頁面的PR值。 最後,所有這些被換算為一個百分比再乘上一個繫數。由於“沒有向外鏈接的頁面”傳遞出去的PageRank會是0,所以,Google通過數學系統給了每個頁面一個最小值: 說明:在Sergey Brin和Lawrence Page的1998年原文中給每一個頁面設定的最小值是1-d,而不是這裡的 (1-d)/N。 所以一個頁面的PageRank是由其他頁面的PageRank計算得到。Google不斷的重覆計算每個頁面的PageRank。如果給每個頁面一個隨機PageRank值(非0),那麼經過不斷的重覆計算,這些頁面的PR值會趨向於穩定,也就是收斂的狀態。這就是搜索引擎使用它的原因。   通過以上文字,可以總結出以下幾點: 1.PR中每個頁面都需要需要一個初始值 2.PR演算法是一個趨於收斂的無限迴圈,因此需要一個條件來確定收斂完畢 一般而言收斂條件有以下三種情況:

1、每個頁面的PR值和上一次計算的PR相等

2、設定一個差值指標(0.0001)。當所有頁面和上一次計算的PR差值平均小於該標準時,則收斂。

3、設定一個百分比(99%),當99%的頁面和上一次計算的PR相等

 

本文將採用第二種方式來實現該演算法:

首先定義一個初始互聯網環境,如下圖所示: 轉化為文件則內容如下:

A B D
B C
C A B
D B C

其中每一行的後面的頁面為第一個頁面的出鏈(A可以鏈到B和C)

由於需要統計每個頁面的入鏈頁面和出鏈數,因此需要兩個MapReduce,第一個用於統計入鏈和出鏈,第二個用於迴圈統計PR值,代碼如下:

package com.tyx.mapreduce.PageRank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by 恆安 on 2017/11/29.
 */
public class RunPageRankJob {

//    統計所有鏈接的入鏈
    private static Map<String,String > allInLine = new HashMap<>();
//    統計所有鏈接的出鏈
    private static Map<String,Integer> allOutLine = new HashMap<>();
//    統計所有鏈接的現有pagerank
    private static Map<String,Double> allPageRank = new HashMap<>();
//    統計所有鏈接計算後的pagerank
    private static Map<String ,Double> allNextPageRank = new HashMap<>();

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
//            configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");
        configuration.set("fs.defaultFS", "hdfs://node1:8020");
        configuration.set("yarn.resourcemanager.hostname", "node1");
//        第一個MapReduce為了統計出每個頁面的入鏈,和每個頁面的出鏈數
        if (run1(configuration)){
            run2(configuration);
        }
    }

    /*
    輸入數據:
    A    B    D
    B    C
    C    A    B
    D    B    C*/

    static class AcountOutMapper extends Mapper<Text,Text,Text,Text>{
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//            super.map(key, value, context);
            int num = 0;
//            若A能連接到B,則說明B是A的一條出鏈
            String[] outLines = value.toString().split("\t");
            for (int i=0;i<outLines.length;i++){
                context.write(new Text(outLines[i]),key);
            }
            num = outLines.length;
//            統計出鏈
            context.write(key,new Text("--"+num));
        }
    }

    static class AcountOutReducer extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            super.reduce(key, values, context);
//            統計該頁面的入鏈頁面
            String outStr = "";
            int sum = 0;
            for (Text text : values){
//                統計出鏈數目
                if (text.toString().contains("--")){
                    sum += Integer.parseInt(text.toString().replaceAll("--",""));
                }else {
                    outStr += text+"\t";
                }
            }
            context.write(key,new Text(outStr+sum));
            allOutLine.put(key.toString(),sum);
            allInLine.put(key.toString(),outStr);
            allPageRank.put(key.toString(),1.0);
        }
    }

    public static boolean run1(Configuration configuration){
        try {
            Job job = Job.getInstance(configuration);
            FileSystem fileSystem = FileSystem.get(configuration);

            job.setJobName("acountline");

            job.setJarByClass(RunPageRankJob.class);

            job.setMapperClass(AcountOutMapper.class);
            job.setReducerClass(AcountOutReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setInputFormatClass(KeyValueTextInputFormat.class);

            Path intPath = new Path("/usr/output/pagerank.txt");
            FileInputFormat.addInputPath(job,intPath);


            Path outPath = new Path("/usr/output/acoutline");
            if (fileSystem.exists(outPath)){
                fileSystem.delete(outPath,true);
            }
            FileOutputFormat.setOutputPath(job,outPath);

            boolean f = job.waitForCompletion(true);
            return f;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /*第一次MapReduce輸出數據:
    A    C
    B    A   C   D
    C    B   D
    D    A*/

    static class PageRankMapper extends Mapper<Text,Text,Text,Text>{
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//            super.map(key, value, context);
            String myUrl = key.toString();
//            取出該頁面所有的入鏈頁面
            String inLines = allInLine.get(myUrl);
            context.write(key,new Text(inLines));
        }
    }

    static class PageRankReducer extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            super.reduce(key, values, context);
//            後半段求和公式的和 (PR(1)/L(1)…………PR(i)/L(i)
            double sum = 0.0;
            String outStr = "";
            for (Text text : values){
                String[] arr = text.toString().split("\t");
                for (int i=0;i<arr.length;i++){
                    outStr += arr[i]+"\t";
                    sum += allPageRank.get(arr[i])/allOutLine.get(arr[i]);
                }
            }
//            算出該頁面本次的PR結果
            double nowPr = (1-0.85)/allPageRank.size()+0.85*sum;

            allNextPageRank.put(key.toString(),nowPr);
            context.write(key,new Text(outStr));
        }
    }

    public static void run2(Configuration configuration){
        double d = 0.001;
        int i=1;
//        迭代迴圈趨於收斂
        while (true){
            try {
                configuration.setInt("count",i);
                i++;
                Job job = Job.getInstance(configuration);
                FileSystem fileSystem = FileSystem.get(configuration);

                job.setJobName("pagerank");
                job.setJarByClass(RunPageRankJob.class);
                job.setJobName("Pr"+i);

                job.setMapperClass(PageRankMapper.class);
                job.setReducerClass(PageRankReducer.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);

                job.setInputFormatClass(KeyValueTextInputFormat.class);

                Path intPath = new Path("/usr/output/pagerank.txt");
                if (i>2){
                    intPath = new Path("/usr/output/Pr"+(i-1));
                }
                FileInputFormat.addInputPath(job,intPath);


                Path outPath = new Path("/usr/output/Pr"+i);
                if (fileSystem.exists(outPath)){
                    fileSystem.delete(outPath,true);
                }
                FileOutputFormat.setOutputPath(job,outPath);

                boolean f = job.waitForCompletion(true);
                if (f){
                    System.out.println("job執行完畢");
                    double sum = 0.0;
//                    提取本輪所有頁面的PR值和上一輪作比較,
                    for (String key : allPageRank.keySet()){
                        System.out.println(key+"--------------------------"+allPageRank.get(key));
                        sum += Math.abs(allNextPageRank.get(key)-allPageRank.get(key));
                        allPageRank.put(key,allNextPageRank.get(key));
                    }
                    System.out.println(sum);
//                    若平均差小於d則表示收斂完畢
                    if (sum/allPageRank.size()<d){
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

最終結果輸入如下:

由圖可知在這四個頁面組成的互聯網集群中,頁面C的重要性是最高的

本次操作一共經過了30次迴圈:

 

若有不對之處請不吝指教,謝謝


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 在點擊進入地圖的入口(下麵數據是vue渲染的數據) <a class="navigation" v-if="merchant.longitude && merchant.latitude" href="http://api.map.baidu.com/marker?location={{mercha ...
  • 同源策略 同源策略(Same origin policy)是一種約定,它是瀏覽器最核心也最基本的安全功能,如果缺少了同源策略,則瀏覽器的正常功能可能都會受到影響。可以說Web是構建在同源策略基礎之上的,瀏覽器只是針對同源策略的一種實現。 同源策略,它是由Netscape提出的一個著名的安全策略。現在 ...
  • 1 USE [Test] 2 GO 3 SET ANSI_NULLS ON 4 GO 5 SET QUOTED_IDENTIFIER ON 6 GO 7 --@column 表示欄位或者常量,@paddingChar 表示 補位字元, @len 補位數量, @returnStr 8 create f... ...
  • OLAP也稱決策支持系統(Decision Support System,DSS),是數據倉庫系統的主要應用形式,使分析人員、管理人員或執行人員能夠從多種角度對從原始數據中轉化出來的、能夠真正為用戶所理解的、並真實反映企業維特性的信息進行快速、一致、交互地存取,從而獲得對數據的更深入瞭解的一類軟體技... ...
  • 北京賽車的玩法豐富多樣,但其實都是很簡單,只要用心理解。根本不成問題多看走勢圖,不是看一兩次。而是每天抽空閑的時間去留意。方便藉助參考。懂得看走勢會操作的讓人才能在彩界如水的魚。如果你不行,並不代表我不行。給你自己一個機會。只要你敢跟我,我絕對會讓你今天所做出的決定而感到欣慰,我沒有華麗的語言一切靠 ...
  • 原公司用的資料庫是Oracle和MySQL居多,寫的SQL語句也比較少,有些生疏了。現在的公司使用的DB2資料庫,完全沒接觸過,導致一些函數的使用要在網上搜索案例,現在總結一點DB2的函數使用方法。 正確需求:查詢出指定日期的工作日,頁面傳一個天數,並返回一個新的日期。 下麵是時間表欄位: 剛開始項 ...
  • 表名:products 欄位:product_id、product_name、product_price、vend_id(供應商) 12.1聚集函數: 我們常常需要彙總數據,而不是把數據檢索出來,MySQL提供了專門的函數。 檢索例子: 確定表中行數 獲得表中行組的和 找出表列 MySQL提供了5個 ...
  • 一、kafka安裝(集群模式) 1、安裝前準備 機器:10.199.240.232,10.199.206.20 kafka版本:2.12-0.10.1.1 下載地址; https://mirrors.tuna.tsinghua.edu.cn/apache/ 相關目錄: /apps/svr/kafka ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...