MapReduce -- 統計天氣信息統計

来源:http://www.cnblogs.com/one--way/archive/2016/07/11/5661148.html
-Advertisement-
Play Games

示例 數據: 要求: 將每年每月中的氣溫排名前三的數據找出來 實現: 1.每一年用一個reduce任務處理; 2.創建自定義數據類型,存儲 [年-月-日-溫度]; 2.自己實現排序函數 根據 [年-月-溫度] 降序排列,也可以在定義數據類型中進行排序; 3.自己實現分組函數,對 [年-月] 分組,r ...


示例

 

數據:

1949-10-01 14:21:02    34c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1951-12-02 12:21:02    45c
1951-12-03 12:21:02    50c
1951-12-23 12:21:02    33c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c

 

要求:

將每年每月中的氣溫排名前三的數據找出來

實現:

1.每一年用一個reduce任務處理;

2.創建自定義數據類型,存儲 [年-月-日-溫度];

2.自己實現排序函數 根據 [年-月-溫度] 降序排列,也可以在定義數據類型中進行排序;

3.自己實現分組函數,對 [年-月] 分組,reduce的key是分組結果,根據相同的分組值,統計reduce的value值,只統計三個值就可以,因為已經實現了自排序函數。

 

註意點:

1.自定義數據類型的使用;

2.自定義排序類的使用;

3.自定義分組類的使用,分組類對那些數據進行分組;

4.自定義分區類,分區類與reduce job個數的關係;

 

示例代碼:

RunJob.java  

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.NullWritable;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.Job;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.Reducer;
 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 12 
 13 import java.io.IOException;
 14 import java.text.ParseException;
 15 import java.text.SimpleDateFormat;
 16 import java.util.Calendar;
 17 import java.util.Date;
 18 
 19 /**
 20  * weather 統計天氣信息
 21  *
 22  * 數據:
 23  *  1999-10-01 14:21:02    34c
 24  *  1999-11-02 13:01:02    30c
 25  *
 26  * 要求:
 27  * 將每年的每月中氣溫排名前三的數據找出來
 28  *
 29  * 實現:
 30  * 1.每一年用一個reduce任務處理;
 31  * 2.創建自定義數據類型,存儲 [年-月-日-溫度];
 32  * 2.自己實現排序函數 根據 [年-月-溫度] 降序排列,也可以在定義數據類型中進行排序;
 33  * 3.自己實現分組函數,對 [年-月] 分組,reduce的key是分組結果,根據相同的分組值,統計reduce的value值,只統計三個值就可以,因為已經實現了自排序函數。
 34  *
 35  * Created by Edward on 2016/7/11.
 36  */
 37 public class RunJob {
 38 
 39     public static void main(String[] args)
 40     {
 41         //access hdfs's user
 42         System.setProperty("HADOOP_USER_NAME","root");
 43 
 44         Configuration conf = new Configuration();
 45         conf.set("fs.defaultFS", "hdfs://node1:8020");
 46 
 47 
 48         try {
 49             FileSystem fs = FileSystem.get(conf);
 50 
 51             Job job = Job.getInstance(conf);
 52             job.setJarByClass(RunJob.class);
 53             job.setMapperClass(MyMapper.class);
 54             job.setReducerClass(MyReducer.class);
 55 
 56             //需要指定 map out 的 key 和 value
 57             job.setOutputKeyClass(InfoWritable.class);
 58             job.setOutputValueClass(Text.class);
 59 
 60             //設置分區 繼承 HashPartitioner
 61             job.setPartitionerClass(YearPartition.class);
 62             //根據年份創建指定數量的reduce task
 63             job.setNumReduceTasks(3);
 64 
 65             //設置排序 繼承 WritableComparator
 66             //job.setSortComparatorClass(SortComparator.class);
 67 
 68             //設置分組 繼承 WritableComparator 對reduce中的key進行分組
 69             job.setGroupingComparatorClass(GroupComparator.class);
 70 
 71             FileInputFormat.addInputPath(job, new Path("/test/weather/input"));
 72 
 73             Path path = new Path("/test/weather/output");
 74             if(fs.exists(path))//如果目錄存在,則刪除目錄
 75             {
 76                 fs.delete(path,true);
 77             }
 78             FileOutputFormat.setOutputPath(job, path);
 79 
 80             boolean b = job.waitForCompletion(true);
 81             if(b)
 82             {
 83                 System.out.println("OK");
 84             }
 85 
 86         } catch (Exception e) {
 87             e.printStackTrace();
 88         }
 89     }
 90 
 91 
 92     public static class MyMapper extends Mapper<LongWritable, Text, InfoWritable, Text > {
 93 
 94         private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 95 
 96         @Override
 97         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 98             String[] str = value.toString().split("\t");
 99 
100             try {
101                 Date date = sdf.parse(str[0]);
102                 Calendar c = Calendar.getInstance();
103                 c.setTime(date);
104                 int year = c.get(Calendar.YEAR);
105                 int month = c.get(Calendar.MONTH)+1;
106                 int day = c.get(Calendar.DAY_OF_MONTH);
107 
108                 double temperature = Double.parseDouble(str[1].substring(0,str[1].length()-1));
109 
110                 InfoWritable info = new InfoWritable();
111                 info.setYear(year);
112                 info.setMonth(month);
113                 info.setDay(day);
114                 info.setTemperature(temperature);
115 
116                 context.write(info, value);
117 
118             } catch (ParseException e) {
119                 e.printStackTrace();
120             }
121         }
122     }
123 
124     public static class MyReducer extends Reducer<InfoWritable, Text, Text, NullWritable> {
125         @Override
126         protected void reduce(InfoWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
127             int i=0;
128             for(Text t: values)
129             {
130                 i++;
131                 if(i>3)
132                     break;
133                 context.write(t, NullWritable.get());
134             }
135         }
136     }
137 }

 

InfoWritable.java 

 1 import org.apache.hadoop.io.WritableComparable;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 
 7 /**
 8  * 自定義數據類型 繼承 WritableComparable
 9  * 【年-月-日-溫度】
10  * Created by Edward on 2016/7/11.
11  */
12 public class InfoWritable implements WritableComparable<InfoWritable> {
13 
14     private int year;
15     private int month;
16     private int day;
17     private double temperature;
18 
19     public void setYear(int year) {
20         this.year = year;
21     }
22 
23     public void setMonth(int month) {
24         this.month = month;
25     }
26 
27     public void setDay(int day) {
28         this.day = day;
29     }
30 
31     public void setTemperature(double temperature) {
32         this.temperature = temperature;
33     }
34 
35     public int getYear() {
36         return year;
37     }
38 
39     public int getMonth() {
40         return month;
41     }
42 
43     public int getDay() {
44         return day;
45     }
46 
47     public double getTemperature() {
48         return temperature;
49     }
50 
51     /**
52      *
53      * 對象比較,對溫度進行倒序排序
54      */
55     @Override
56     public int compareTo(InfoWritable o) {
57 
58         int result = Integer.compare(this.getYear(),o.getYear());
59         if(result == 0)
60         {
61             result = Integer.compare(this.getMonth(),o.getMonth());
62             if(result == 0)
63             {
64                 return -Double.compare(this.getTemperature(), o.getTemperature());
65             }
66             else
67                 return result;
68         }
69         else
70             return result;
71 
72         //return this==o?0:-1;
73     }
74 
75     @Override
76     public void write(DataOutput dataOutput) throws IOException {
77         dataOutput.writeInt(this.year);
78         dataOutput.writeInt(this.month);
79         dataOutput.writeInt(this.day);
80         dataOutput.writeDouble(this.temperature);
81     }
82 
83     @Override
84     public void readFields(DataInput dataInput) throws IOException {
85         this.year = dataInput.readInt();
86         this.month = dataInput.readInt();
87         this.day = dataInput.readInt();
88         this.temperature = dataInput.readDouble();
89     }
90 }

 

YearPartition.java

 1 import org.apache.hadoop.io.Text;
 2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 3 
 4 /**
 5  *
 6  * 創建分區,通過key中的year來創建分區
 7  *
 8  * Created by Edward on 2016/7/11.
 9  */
10 public class YearPartition extends HashPartitioner <InfoWritable, Text>{
11     @Override
12     public int getPartition(InfoWritable key, Text value, int numReduceTasks) {
13         return key.getYear()%numReduceTasks;
14     }
15 }

 

GroupComparator.java

 1 import org.apache.hadoop.io.WritableComparable;
 2 import org.apache.hadoop.io.WritableComparator;
 3 
 4 /**
 5  * 創建分組類,繼承WritableComparator
 6  * 【年-月】
 7  * Created by Edward on 2016/7/11.
 8  */
 9 public class GroupComparator extends WritableComparator {
10 
11     GroupComparator()
12     {
13         super(InfoWritable.class, true);
14     }
15 
16     @Override
17     public int compare(WritableComparable a, WritableComparable b) {
18         InfoWritable ia = (InfoWritable)a;
19         InfoWritable ib = (InfoWritable)b;
20 
21         int result = Integer.compare(ia.getYear(),ib.getYear());
22         if(result == 0)
23         {
24             return Integer.compare(ia.getMonth(),ib.getMonth());
25         }
26         else
27             return result;
28     }
29 }

 

SortComparator.java

 1 import org.apache.hadoop.io.WritableComparable;
 2 import org.apache.hadoop.io.WritableComparator;
 3 
 4 /**
 5  * 排序類,繼承WritableComparator
 6  * 排序規則【年-月-溫度】 溫度降序
 7  * Created by Edward on 2016/7/11.
 8  */
 9 public class SortComparator extends WritableComparator {
10 
11     /**
12      * 調用父類的構造函數
13      */
14     SortComparator()
15     {
16         super(InfoWritable.class, true);
17     }
18 
19 
20     /**
21      * 比較兩個對象的大小,使用降序排列
22      * @param a
23      * @param b
24      * @return
25      */
26     @Override
27     public int compare(WritableComparable a, WritableComparable b) {
28 
29         InfoWritable ia = (InfoWritable)a;
30         InfoWritable ib = (InfoWritable)b;
31 
32         int result = Integer.compare(ia.getYear(),ib.getYear());
33         if(result == 0)
34         {
35             result = Integer.compare(ia.getMonth(),ib.getMonth());
36             if(result == 0)
37             {
38                 return -Double.compare(ia.getTemperature(), ib.getTemperature());
39             }
40             else
41                 return result;
42         }
43         else
44             return result;
45     }
46 }

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 數據的插入(INSERT語句的使用方法) 原則上,執行一次INSERT語句會插入一行數據。 對錶進行全列INSERT時,可以省略表名後的列清單。 省略INSERT語句中的列名,就會自動設定為該列的預設值(沒有預設值時會設定為NULL)。 從其他表中複製數據:INSERT …SELECT。 INSER ...
  • 1,很奇怪,控制面板找不到SQL的卸載程式,後來在網上查知需要修複windows卸載/安裝程式,官網下載網址: 1.1 下載雙擊打開後,直接選next 1.2 根據你的情況選擇,我這裡選擇“卸載” 1.3 這裡可以看到你電腦中所有安裝程式,選中你所需要修複的卸載程式,點擊next(我這圖是卸載完再截 ...
  • 第一招、mysql服務的啟動和停止 net stop mysql net start mysql 第二招、登陸mysql 語法如下: mysql -u用戶名 -p用戶密碼 鍵入命令mysql -uroot -p, 回車後提示你輸入密碼,輸入12345,然後回車即可進入到mysql中了,mysql的提 ...
  • 為什麼我也要說SQL Server的並行: 這幾天園子里寫關於SQL Server並行的文章很多,不管怎麼樣,都讓人對並行操作有了更深刻的認識。 我想說的是:儘管並行操作可能(並不是一定)存在這樣或者那樣的問題,但是我們不能否認並行,仍然要利用好並行。 但是,實際開發中,某些SQL語句的寫法會導致用 ...
  • extent--最小空間分配單位 --tablespace managementblock --最小i/o單位 --segment managementcreate tablespace jamesdatafile '/export/home/oracle/oradata/james.dbf'siz ...
  • 註: sql server 2005 及以上支持. 版本估計是不支持(工作環境2005,2008). 工作需要, 需要向SQL Server 現有表中添加新列並添加描述. 從而有個如下存儲過程. (先附上存儲過程然後解釋) 代碼 /********調用方法********** 作用: 添加列並添加列 ...
  • 數據說明: 對錶進行聚合查詢 聚合函數: COUNT:計算表中的記錄數(行數)。 SUM:計算表中數值列的數據合計值。 AVG:計算表中數值列的數據平均值。 MAX:求出表中任意列中數據的最大值。 MIN:求出表中任意列中數據的最小值。 COUNT函數的結果根據參數的不同而不同。COUNT(*)會得 ...
  • Timeout 時間已到。在操作完成之前超時時間已過或伺服器未響應。 嘗試連接到 Principle 伺服器時發生了此故障。一個正常的邏輯,突然報錯,除了超時,還有這個Principle 伺服器故障.查詢相關信息,google就查出來說阿裡雲曾經報錯,然後是因為資料庫故障,於是在msdn上查找相關數 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...