MapRedue開發實例

来源:http://www.cnblogs.com/learn21cn/archive/2016/12/04/6130109.html
-Advertisement-
Play Games

一些例子,所用版本為hadoop 2.6.5 1、統計字數 數據格式如下(單詞,頻數,以tab分開): 2、統計用戶在網站的停留時間 數據格式(用戶,毫秒數,網站,以tab分開): 運行:hadoop jar ~/c02mrtest.jar com.mr.test.MRWeb TestData/we ...


一些例子,所用版本為hadoop 2.6.5

1、統計字數

數據格式如下(單詞,頻數,以tab分開):

A    100
B    97
C    98
A 98
 1 package com.mr.test;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 
14 public class MRTest {
15     
16     public static class C01Mapper extends Mapper<Object, Text, Text, IntWritable> {
17         
18         @Override
19         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
20             String[] line = value.toString().split("\t");
21             if(line.length == 2) {
22                 context.write(new Text(line[0]),new IntWritable(Integer.parseInt(line[1])));                
23             }            
24         }
25     }
26     
27     public static class C01Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
28         
29         @Override
30         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
31             int i =0;
32             for(IntWritable value : values){
33                 i += value.get();
34             }
35             context.write(key, new IntWritable(i));
36         }        
37     }    
38 
39     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
40         //參數含義: agrs[0]標識 in, agrs[1]標識 out,agrs[2]標識 unitmb,agrs[3]標識 reducer number,
41         
42         int unitmb =Integer.valueOf(args[2]);
43         String in = args[0];
44         String out = args[1];
45         int nreducer = Integer.valueOf(args[3]);
46         
47         Configuration conf = new Configuration();
48         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
49         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
50         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
51         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
52         
53         Job job = new Job(conf);
54         FileInputFormat.addInputPath(job, new Path(in));
55         FileOutputFormat.setOutputPath(job, new Path(out));
56         job.setMapperClass(C01Mapper.class);
57         job.setReducerClass(C01Reducer.class);
58         job.setNumReduceTasks(nreducer);
59         job.setCombinerClass(C01Reducer.class);
60         job.setMapOutputKeyClass(Text.class);
61         job.setMapOutputValueClass(IntWritable.class);
62         job.setOutputKeyClass(Text.class);
63         job.setOutputValueClass(IntWritable.class);
64         job.setJarByClass(MRTest.class);
65         job.waitForCompletion(true);
66     }
67 }

2、統計用戶在網站的停留時間

數據格式(用戶,毫秒數,網站,以tab分開):

A	100	baidu.com
B	900	google.com
C	515	sohu.com
D	618	sina.com
E	791	google.com
B	121	baidu.com
C	915	google.com
D	112	sohu.com
E	628	sina.com
A	681	google.com
C	121	baidu.com
D	215	google.com
E	812	sohu.com
A	128	sina.com
B	291	google.com
  1 package com.mr.test;
  2 
  3 import java.io.IOException;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.IntWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.io.WritableComparable;
  9 import org.apache.hadoop.io.WritableComparator;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 
 17 public class MRWeb {
 18 
 19     public static class C02Mapper extends Mapper<Object, Text, Text, Text> {
 20         @Override
 21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
 22             String line[] = value.toString().split("\t");
 23             //格式檢查
 24             if(line.length == 3){
 25                 String name = line[0];
 26                 String time = line[1];
 27                 String website = line[2];    
 28                 context.write(new Text(name + "\t" + time), new Text(time + "\t" + website));
 29             }                
 30         }        
 31     }    
 32     
 33     public static class C02Partitioner extends Partitioner<Text, Text> {
 34         
 35         @Override
 36         public int getPartition(Text key, Text value, int number) {
 37             String name = key.toString().split("\t")[0];
 38             int hash =name.hashCode();    
 39             //以此實現分區
 40             return Math.abs(hash % number);
 41         }
 42         
 43     }
 44     
 45     public static class C02Sort extends WritableComparator {
 46         //必須有的
 47         protected C02Sort() {
 48             super(Text.class,true);            
 49         }
 50         
 51         @Override
 52         public int compare(WritableComparable w1, WritableComparable w2) {
 53             Text h1 = new Text(((Text)w1).toString().split("\t")[0] );
 54             Text h2 = new Text(((Text)w2).toString().split("\t")[0] );
 55             IntWritable m1 =new IntWritable(Integer.valueOf(((Text)w1).toString().split("\t")[1]));
 56             IntWritable m2 =new IntWritable(Integer.valueOf(((Text)w2).toString().split("\t")[1]));
 57             
 58             int result;
 59             if(h1.equals(h2)){
 60                 result = m2.compareTo(m1);
 61             }else {
 62                 result =h1.compareTo(h2);
 63             }    
 64             return result;
 65         }
 66     }
 67     
 68     public  static class C02Group extends WritableComparator{
 69         protected C02Group() {
 70             super(Text.class,true);            
 71         }
 72         @Override
 73         public int compare(WritableComparable w1, WritableComparable w2) {
 74             Text h1 = new Text(((Text)w1).toString().split("\t")[0] );
 75             Text h2 = new Text(((Text)w2).toString().split("\t")[0] );
 76                         
 77             return h1.compareTo(h2);
 78         }        
 79     }
 80     
 81     public static class C02Reducer extends Reducer<Text, Text, IntWritable, Text> {
 82         
 83         @Override
 84         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 85             int count = 0;
 86             String name =key.toString().split("\t")[0];
 87             //分組排序已經做好了,這裡只管列印
 88             for(Text value : values){
 89                 count++;
 90                 StringBuffer buffer = new StringBuffer();
 91                 buffer.append(name);
 92                 buffer.append("\t");
 93                 buffer.append(value.toString());
 94                 context.write(new IntWritable(count), new Text(buffer.toString()));                
 95             }
 96         }
 97     }
 98     
 99     public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
100         //參數含義: agrs[0]標識 in, agrs[1]標識 out,agrs[2]標識 unitmb,agrs[3]標識 reducer number,
101         if(args.length != 4){
102             System.out.println("error");
103             System.exit(0);
104         }
105         
106         int unitmb =Integer.valueOf(args[2]);
107         String in = args[0];
108         String out = args[1];
109         int nreducer = Integer.valueOf(args[3]);
110                 
111         Configuration conf = new Configuration();
112         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
113         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
114         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
115         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
116                 
117         Job job = new Job(conf);
118         FileInputFormat.addInputPath(job, new Path(in));
119         FileOutputFormat.setOutputPath(job, new Path(out));
120         job.setMapperClass(C02Mapper.class);
121         job.setReducerClass(C02Reducer.class);
122         job.setNumReduceTasks(nreducer);
123         job.setPartitionerClass(C02Partitioner.class);
124         job.setGroupingComparatorClass(C02Group.class);
125         job.setSortComparatorClass(C02Sort.class);        
126         job.setMapOutputKeyClass(Text.class);
127         job.setMapOutputValueClass(Text.class);
128         job.setOutputKeyClass(IntWritable.class);
129         job.setOutputValueClass(Text.class);
130         job.setJarByClass(MRWeb.class);
131         job.waitForCompletion(true);
132     }
133 }

運行:hadoop jar ~/c02mrtest.jar com.mr.test.MRWeb TestData/webcount.txt /DataWorld/webresult 128 1

結果的樣子:

 

3、json數組分析

數據格式(前面以tab分開):

1	[{"name":"A","age":16,"maths":100}]
2	[{"name":"B","age":17,"maths":97}]
3	[{"name":"C","age":18,"maths":89}]
4	[{"name":"D","age":15,"maths":98}]
5	[{"name":"E","age":19,"maths":100}]
 1 package com.mr.test;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 import net.sf.json.JSONArray;
12 import net.sf.json.JSONObject;
13 
14 public class MRString {
15     
16     public static class C03Mapper extends Mapper<Object, Text, Text, Text> {
17         @Override
18         protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
19                 throws IOException, InterruptedException {
20             String[] line = value.toString().split("\t");
21             if(line.length ==2){
22                 String c = line[0];
23                 String j = line[1];
24                 JSONArray jsonArray =JSONArray.fromObject(j);
25                 int size = jsonArray.size();
26                 for(int i=0;i<size;i++){
27                     String name = "";
28                     String age = "";
29                     String maths = "";
30                     JSONObject jsonObject =jsonArray.getJSONObject(i);
31                     if(jsonObject.containsKey("name")){
32                         name = jsonObject.getString("name");
33                     }
34                     if(jsonObject.containsKey("age")){
35                         age = jsonObject.getString("age");
36                     }
37                     if(jsonObject.containsKey("maths")){
38                         maths = jsonObject.getString("maths");
39                     }
40                     StringBuffer buffer =new StringBuffer();
41                     buffer.append(name);
42                     buffer.append("\t");
43                     buffer.append(age);
44                     buffer.append("\t");
45                     buffer.append(maths);
46                     context.write(new Text(c), new Text(buffer.toString()));                    
47                 }
48             }            
49         }        
50     }
51 
52     public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
53         //參數含義: agrs[0]標識 in, agrs[1]標識 out,agrs[2]標識 unitmb,agrs[3]
54         if(args.length != 3){
55             System.out.println("error");
56             System.exit(0);
57         }
58         
59         int unitmb =Integer.valueOf(args[2]);
60         String in = args[0];
61         String out = args[1];
62                         
63         Configuration conf = new Configuration();
64         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
65         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
66         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
67         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
68                 
69         Job job = new Job(conf);
70         job.addFileToClassPath(new Path("TestData/json-lib-2.4-jdk15.jar"));
71         job.addFileToClassPath(new Path("TestData/ezmorph-1.0.6.jar"));
72         FileInputFormat.addInputPath(job, new Path(in));
73         FileOutputFormat.setOutputPath(job, new Path(out));
74         job.setMapperClass(C03Mapper.class);
75         //沒有reducer的情況下必須設置
76         job.setNumReduceTasks(0);                
77         job.setMapOutputKeyClass(Text.class);
78         job.setMapOutputValueClass(Text.class);
79         job.setOutputKeyClass(Text.class);
80         job.setOutputValueClass(Text.class);
81         job.setJarByClass(MRString.class);
82         job.waitForCompletion(true);
83     }
84 }

運行 hadoop jar ~/c03mrtest.jar com.mr.test.MRString TestData/jsonarray.txt /DataWorld/jsonoutput 128

結果:

這個例子還有一點值得註意(Path中的目錄是HDFS中的目錄):

 job.addFileToClassPath(new Path("TestData/json-lib-2.4-jdk15.jar")); //jar文件下載地址:http://json-lib.sourceforge.net/

 job.addFileToClassPath(new Path("TestData/ezmorph-1.0.6.jar")); //jar文件下載地址:http://ezmorph.sourceforge.net/
使用這兩句,在程式中動態添加了用於json解析的jar文件,而利用伺服器中的ClassPath是訪問不到這兩個文件的。在編程的時候,在windows客戶端下,為了語法書寫方便,導入了json-lib-2.4-jdk15.jar,但是並沒有導入ezmorph-1.0.6.jar

也就是說,可以在程式中動態的加入jar文件,只要知道了它在HDFS中的位置。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 註: 出於職業要求, 本文中所有數字均被人為修改過, 並非真實數字, 很抱歉也不能貼出源代碼 目標: 個險客戶特征分析 背景: 目前市場部使用的推廣活動分析系統只能針對客戶調查返回的信息分析,且僅有年齡/性別/婚姻狀態/收入四個維度, 預測精度不高. 市場部希望能從現有的壽險客戶信息分析出影響他們選 ...
  • 當with和as一起用時,表示定義一個SQL字句 例: with sonword as ( select * from person ) select * from student where name in (select name from sonword) 相當於 select * from ...
  • mysql -u root -pvmware mysql>use mysql; mysql>update user set host = '%' where user = 'root'; mysql>flush privileges; mysql>select host, user from use ...
  • mongoDB中存儲的數據單元被稱作文檔。文檔的格式與JSON很類似,只不過由於JSON表達的數據類型範圍太小(null,boolean,numeric,string和object),mongoDB對其做了擴充,並命名為BSON。下麵對各個數據類型進行介紹。 null {‘x’: null},數據為... ...
  • 最近幫客戶實施的基於SQL Server AlwaysOn跨機房切換項目 最近一個來自重慶的客戶找到走起君,客戶的業務是做移動互聯網支付,是微信支付收單渠道合作伙伴,資料庫里存儲的是支付流水和交易流水。 由於客戶那邊沒有DBA,所以找到走起君商量一個資料庫伺服器搬遷項目。 項目背景 客戶需要把在10 ...
  • 索引概念 B+樹索引分為聚集索引和非聚集索引(輔助索引),但是兩者的數據結構都和B+樹一樣,區別是存放的內容。 可以說資料庫必須有索引,沒有索引則檢索過程變成了順序查找,O(n)的時間複雜度幾乎是不能忍受的。我們非常容易想象出一個只有單關鍵字組成的表如何使用B+樹進行索引,只要將關鍵字存儲到樹的節點 ...
  • 環境: 14.04.1 Ubuntu 1、安裝Nginx ubantu安裝完Nginx後,文件結構大致為: 所有的配置文件都在 下; 啟動程式文件在 下; 日誌文件在 下,分別是access.log和error.log; 並且在 下創建了nginx啟動腳本 安裝完成後可以嘗試啟動nginx: 然後能 ...
  • 翻譯沒有追求信達雅,不是為了學英語翻譯,是為了快速瞭解新特性,如有語義理解錯誤可以指正。歡迎加微信12735770或QQ12735770探討oracle技術問題:) In-Memory Column Store記憶體列存儲 Starting in Oracle Database 12c Release ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...