MapRedue開發實例

来源:http://www.cnblogs.com/learn21cn/archive/2016/12/04/6130109.html
-Advertisement-
Play Games

一些例子,所用版本為hadoop 2.6.5 1、統計字數 數據格式如下(單詞,頻數,以tab分開): 2、統計用戶在網站的停留時間 數據格式(用戶,毫秒數,網站,以tab分開): 運行:hadoop jar ~/c02mrtest.jar com.mr.test.MRWeb TestData/we ...


一些例子,所用版本為hadoop 2.6.5

1、統計字數

數據格式如下(單詞,頻數,以tab分開):

A    100
B    97
C    98
A 98
 1 package com.mr.test;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 
14 public class MRTest {
15     
16     public static class C01Mapper extends Mapper<Object, Text, Text, IntWritable> {
17         
18         @Override
19         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
20             String[] line = value.toString().split("\t");
21             if(line.length == 2) {
22                 context.write(new Text(line[0]),new IntWritable(Integer.parseInt(line[1])));                
23             }            
24         }
25     }
26     
27     public static class C01Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
28         
29         @Override
30         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
31             int i =0;
32             for(IntWritable value : values){
33                 i += value.get();
34             }
35             context.write(key, new IntWritable(i));
36         }        
37     }    
38 
39     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
40         //參數含義: agrs[0]標識 in, agrs[1]標識 out,agrs[2]標識 unitmb,agrs[3]標識 reducer number,
41         
42         int unitmb =Integer.valueOf(args[2]);
43         String in = args[0];
44         String out = args[1];
45         int nreducer = Integer.valueOf(args[3]);
46         
47         Configuration conf = new Configuration();
48         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
49         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
50         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
51         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
52         
53         Job job = new Job(conf);
54         FileInputFormat.addInputPath(job, new Path(in));
55         FileOutputFormat.setOutputPath(job, new Path(out));
56         job.setMapperClass(C01Mapper.class);
57         job.setReducerClass(C01Reducer.class);
58         job.setNumReduceTasks(nreducer);
59         job.setCombinerClass(C01Reducer.class);
60         job.setMapOutputKeyClass(Text.class);
61         job.setMapOutputValueClass(IntWritable.class);
62         job.setOutputKeyClass(Text.class);
63         job.setOutputValueClass(IntWritable.class);
64         job.setJarByClass(MRTest.class);
65         job.waitForCompletion(true);
66     }
67 }

2、統計用戶在網站的停留時間

數據格式(用戶,毫秒數,網站,以tab分開):

A	100	baidu.com
B	900	google.com
C	515	sohu.com
D	618	sina.com
E	791	google.com
B	121	baidu.com
C	915	google.com
D	112	sohu.com
E	628	sina.com
A	681	google.com
C	121	baidu.com
D	215	google.com
E	812	sohu.com
A	128	sina.com
B	291	google.com
  1 package com.mr.test;
  2 
  3 import java.io.IOException;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.IntWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.io.WritableComparable;
  9 import org.apache.hadoop.io.WritableComparator;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 
 17 public class MRWeb {
 18 
 19     public static class C02Mapper extends Mapper<Object, Text, Text, Text> {
 20         @Override
 21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
 22             String line[] = value.toString().split("\t");
 23             //格式檢查
 24             if(line.length == 3){
 25                 String name = line[0];
 26                 String time = line[1];
 27                 String website = line[2];    
 28                 context.write(new Text(name + "\t" + time), new Text(time + "\t" + website));
 29             }                
 30         }        
 31     }    
 32     
 33     public static class C02Partitioner extends Partitioner<Text, Text> {
 34         
 35         @Override
 36         public int getPartition(Text key, Text value, int number) {
 37             String name = key.toString().split("\t")[0];
 38             int hash =name.hashCode();    
 39             //以此實現分區
 40             return Math.abs(hash % number);
 41         }
 42         
 43     }
 44     
 45     public static class C02Sort extends WritableComparator {
 46         //必須有的
 47         protected C02Sort() {
 48             super(Text.class,true);            
 49         }
 50         
 51         @Override
 52         public int compare(WritableComparable w1, WritableComparable w2) {
 53             Text h1 = new Text(((Text)w1).toString().split("\t")[0] );
 54             Text h2 = new Text(((Text)w2).toString().split("\t")[0] );
 55             IntWritable m1 =new IntWritable(Integer.valueOf(((Text)w1).toString().split("\t")[1]));
 56             IntWritable m2 =new IntWritable(Integer.valueOf(((Text)w2).toString().split("\t")[1]));
 57             
 58             int result;
 59             if(h1.equals(h2)){
 60                 result = m2.compareTo(m1);
 61             }else {
 62                 result =h1.compareTo(h2);
 63             }    
 64             return result;
 65         }
 66     }
 67     
 68     public  static class C02Group extends WritableComparator{
 69         protected C02Group() {
 70             super(Text.class,true);            
 71         }
 72         @Override
 73         public int compare(WritableComparable w1, WritableComparable w2) {
 74             Text h1 = new Text(((Text)w1).toString().split("\t")[0] );
 75             Text h2 = new Text(((Text)w2).toString().split("\t")[0] );
 76                         
 77             return h1.compareTo(h2);
 78         }        
 79     }
 80     
 81     public static class C02Reducer extends Reducer<Text, Text, IntWritable, Text> {
 82         
 83         @Override
 84         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 85             int count = 0;
 86             String name =key.toString().split("\t")[0];
 87             //分組排序已經做好了,這裡只管列印
 88             for(Text value : values){
 89                 count++;
 90                 StringBuffer buffer = new StringBuffer();
 91                 buffer.append(name);
 92                 buffer.append("\t");
 93                 buffer.append(value.toString());
 94                 context.write(new IntWritable(count), new Text(buffer.toString()));                
 95             }
 96         }
 97     }
 98     
 99     public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
100         //參數含義: agrs[0]標識 in, agrs[1]標識 out,agrs[2]標識 unitmb,agrs[3]標識 reducer number,
101         if(args.length != 4){
102             System.out.println("error");
103             System.exit(0);
104         }
105         
106         int unitmb =Integer.valueOf(args[2]);
107         String in = args[0];
108         String out = args[1];
109         int nreducer = Integer.valueOf(args[3]);
110                 
111         Configuration conf = new Configuration();
112         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
113         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
114         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
115         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
116                 
117         Job job = new Job(conf);
118         FileInputFormat.addInputPath(job, new Path(in));
119         FileOutputFormat.setOutputPath(job, new Path(out));
120         job.setMapperClass(C02Mapper.class);
121         job.setReducerClass(C02Reducer.class);
122         job.setNumReduceTasks(nreducer);
123         job.setPartitionerClass(C02Partitioner.class);
124         job.setGroupingComparatorClass(C02Group.class);
125         job.setSortComparatorClass(C02Sort.class);        
126         job.setMapOutputKeyClass(Text.class);
127         job.setMapOutputValueClass(Text.class);
128         job.setOutputKeyClass(IntWritable.class);
129         job.setOutputValueClass(Text.class);
130         job.setJarByClass(MRWeb.class);
131         job.waitForCompletion(true);
132     }
133 }

運行:hadoop jar ~/c02mrtest.jar com.mr.test.MRWeb TestData/webcount.txt /DataWorld/webresult 128 1

結果的樣子:

 

3、json數組分析

數據格式(前面以tab分開):

1	[{"name":"A","age":16,"maths":100}]
2	[{"name":"B","age":17,"maths":97}]
3	[{"name":"C","age":18,"maths":89}]
4	[{"name":"D","age":15,"maths":98}]
5	[{"name":"E","age":19,"maths":100}]
 1 package com.mr.test;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 import net.sf.json.JSONArray;
12 import net.sf.json.JSONObject;
13 
14 public class MRString {
15     
16     public static class C03Mapper extends Mapper<Object, Text, Text, Text> {
17         @Override
18         protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
19                 throws IOException, InterruptedException {
20             String[] line = value.toString().split("\t");
21             if(line.length ==2){
22                 String c = line[0];
23                 String j = line[1];
24                 JSONArray jsonArray =JSONArray.fromObject(j);
25                 int size = jsonArray.size();
26                 for(int i=0;i<size;i++){
27                     String name = "";
28                     String age = "";
29                     String maths = "";
30                     JSONObject jsonObject =jsonArray.getJSONObject(i);
31                     if(jsonObject.containsKey("name")){
32                         name = jsonObject.getString("name");
33                     }
34                     if(jsonObject.containsKey("age")){
35                         age = jsonObject.getString("age");
36                     }
37                     if(jsonObject.containsKey("maths")){
38                         maths = jsonObject.getString("maths");
39                     }
40                     StringBuffer buffer =new StringBuffer();
41                     buffer.append(name);
42                     buffer.append("\t");
43                     buffer.append(age);
44                     buffer.append("\t");
45                     buffer.append(maths);
46                     context.write(new Text(c), new Text(buffer.toString()));                    
47                 }
48             }            
49         }        
50     }
51 
52     public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
53         //參數含義: agrs[0]標識 in, agrs[1]標識 out,agrs[2]標識 unitmb,agrs[3]
54         if(args.length != 3){
55             System.out.println("error");
56             System.exit(0);
57         }
58         
59         int unitmb =Integer.valueOf(args[2]);
60         String in = args[0];
61         String out = args[1];
62                         
63         Configuration conf = new Configuration();
64         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
65         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
66         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
67         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
68                 
69         Job job = new Job(conf);
70         job.addFileToClassPath(new Path("TestData/json-lib-2.4-jdk15.jar"));
71         job.addFileToClassPath(new Path("TestData/ezmorph-1.0.6.jar"));
72         FileInputFormat.addInputPath(job, new Path(in));
73         FileOutputFormat.setOutputPath(job, new Path(out));
74         job.setMapperClass(C03Mapper.class);
75         //沒有reducer的情況下必須設置
76         job.setNumReduceTasks(0);                
77         job.setMapOutputKeyClass(Text.class);
78         job.setMapOutputValueClass(Text.class);
79         job.setOutputKeyClass(Text.class);
80         job.setOutputValueClass(Text.class);
81         job.setJarByClass(MRString.class);
82         job.waitForCompletion(true);
83     }
84 }

運行 hadoop jar ~/c03mrtest.jar com.mr.test.MRString TestData/jsonarray.txt /DataWorld/jsonoutput 128

結果:

這個例子還有一點值得註意(Path中的目錄是HDFS中的目錄):

 job.addFileToClassPath(new Path("TestData/json-lib-2.4-jdk15.jar")); //jar文件下載地址:http://json-lib.sourceforge.net/

 job.addFileToClassPath(new Path("TestData/ezmorph-1.0.6.jar")); //jar文件下載地址:http://ezmorph.sourceforge.net/
使用這兩句,在程式中動態添加了用於json解析的jar文件,而利用伺服器中的ClassPath是訪問不到這兩個文件的。在編程的時候,在windows客戶端下,為了語法書寫方便,導入了json-lib-2.4-jdk15.jar,但是並沒有導入ezmorph-1.0.6.jar

也就是說,可以在程式中動態的加入jar文件,只要知道了它在HDFS中的位置。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 註: 出於職業要求, 本文中所有數字均被人為修改過, 並非真實數字, 很抱歉也不能貼出源代碼 目標: 個險客戶特征分析 背景: 目前市場部使用的推廣活動分析系統只能針對客戶調查返回的信息分析,且僅有年齡/性別/婚姻狀態/收入四個維度, 預測精度不高. 市場部希望能從現有的壽險客戶信息分析出影響他們選 ...
  • 當with和as一起用時,表示定義一個SQL字句 例: with sonword as ( select * from person ) select * from student where name in (select name from sonword) 相當於 select * from ...
  • mysql -u root -pvmware mysql>use mysql; mysql>update user set host = '%' where user = 'root'; mysql>flush privileges; mysql>select host, user from use ...
  • mongoDB中存儲的數據單元被稱作文檔。文檔的格式與JSON很類似,只不過由於JSON表達的數據類型範圍太小(null,boolean,numeric,string和object),mongoDB對其做了擴充,並命名為BSON。下麵對各個數據類型進行介紹。 null {‘x’: null},數據為... ...
  • 最近幫客戶實施的基於SQL Server AlwaysOn跨機房切換項目 最近一個來自重慶的客戶找到走起君,客戶的業務是做移動互聯網支付,是微信支付收單渠道合作伙伴,資料庫里存儲的是支付流水和交易流水。 由於客戶那邊沒有DBA,所以找到走起君商量一個資料庫伺服器搬遷項目。 項目背景 客戶需要把在10 ...
  • 索引概念 B+樹索引分為聚集索引和非聚集索引(輔助索引),但是兩者的數據結構都和B+樹一樣,區別是存放的內容。 可以說資料庫必須有索引,沒有索引則檢索過程變成了順序查找,O(n)的時間複雜度幾乎是不能忍受的。我們非常容易想象出一個只有單關鍵字組成的表如何使用B+樹進行索引,只要將關鍵字存儲到樹的節點 ...
  • 環境: 14.04.1 Ubuntu 1、安裝Nginx ubantu安裝完Nginx後,文件結構大致為: 所有的配置文件都在 下; 啟動程式文件在 下; 日誌文件在 下,分別是access.log和error.log; 並且在 下創建了nginx啟動腳本 安裝完成後可以嘗試啟動nginx: 然後能 ...
  • 翻譯沒有追求信達雅,不是為了學英語翻譯,是為了快速瞭解新特性,如有語義理解錯誤可以指正。歡迎加微信12735770或QQ12735770探討oracle技術問題:) In-Memory Column Store記憶體列存儲 Starting in Oracle Database 12c Release ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...