Hadoop 綜合揭秘——MapReduce 編程實例(詳細介紹 Combine、Partitioner、WritableComparable、WritableComparator 使用方式)

来源:https://www.cnblogs.com/leslies2/archive/2018/08/16/9009574.html
-Advertisement-
Play Games

本章主要介紹了 MapReduce 的開發原理及應用場景,講解如何利用 Combine、Partitioner、WritableComparable、WritableComparator 等組件對數據進行排序篩選聚合分組的功能。利用例子模仿 SQL 關係資料庫進行SELECT、WHERE、GROUP... ...


前言

本文主要介紹 MapReduce 的原理及開發,講解如何利用 Combine、Partitioner、WritableComparator等組件對數據進行排序篩選聚合分組的功能。
由於文章是針對開發人員所編寫的,在閱讀本文前,文章假設讀者已經對Hadoop的工作原理、安裝過程有一定的瞭解,因此對Hadoop的安裝就不多作說明。請確保源代碼運行在Hadoop 2.x以上版本,並以偽分佈形式安裝以方便進行調試(單機版會對 Partitioner 功能進行限制)。
文章主要利用例子介紹如何利用 MapReduce 模仿 SQL 關係資料庫進行SELECT、WHERE、GROUP、JOIN 等操作,並對 GroupingComparator、SortComparator 等功能進行說明。
希望本篇文章能對各位的學習研究有所幫助,當中有所錯漏的地方敬請點評。

 

目錄

一、MapReduce 工作原理簡介

二、MapReduce 開發實例

三、利用 Partitioner 控制鍵值分配

四、利用 Combiner 提高系統性能

五、WritableComparatable 自定義鍵值說明

六、實現數據排序與分組

七、數據集連接處理方式介紹

 

 

 

一、MapReduce 工作原理簡介

對Hadoop有興趣的朋友相信對Hadoop的主要工作原理已經有一定的認識,在講解MapReduce的程式開發前,本文先針對Mapper、Reducer、Partitioner、Combiner、Suhffle、Sort的工作原理作簡單的介紹,以幫助各位更好地瞭解後面的內容。

圖 1.1

1.1 Mapper 階段

當系統對數據進行分片後,每個輸入分片會分配到一個Mapper任務來處理,預設情況下系統會以HDFS的一個塊大小64M作為分片大小,當然也可以通過配置文件設置塊的大小。隨後Mapper節點輸出的數據將保存到一個緩衝區中(緩衝區的大小預設為512M,可通過mapreduce.task.io.sort.mb屬性進行修改),緩衝區越大排序效率越高。當該緩衝區快要溢出時(緩衝區預設大小為80%,可通過mapreduce.map.sort.spill.percent屬性進行修改),系統會啟動一個後臺線程,將數據傳輸到會到本地的一個文件當中。

1.2 Partitioner 階段

在Mapper完成 KEY/VALUE 格式的數據操作後,Partitioner 將會被調用,由於真實環境中 Hadoop 可能會包含幾十個甚至上百個Reducer ,Partitioner 的主要作用就是根據自定義方式確定數據將被傳輸到哪一個Reducer進行處理。

1.3 Combiner 階段

如果系統定義了Combiner,在經過 Partitioner 排序處理後將會進行 Combiner處理。我們可以把 Combiner 看作為一個小型的 Reducer ,由於數據從 Mapper 通過網路傳送到 Reducer ,資源開銷很大,Combiner 目的就是在數據傳送到Reducer前作出初步聚集處理,減少伺服器的壓力。如果數據量太大,還可以把 mapred.compress.map.out 設置為 true,就可以將數據進行壓縮。(關於數據壓縮的內容已經超越本文的討論範圍,以後會有獨立的篇章針對數據壓縮進行專題討論,敬請期待)

1.4 Shuffle 階段

在 Shuffle 階段,每個 Reducer 會啟動 5 個線程(可通過 mapreduce.reduce.shuffle.parallelcopies 進行設置)通過HTTP協議獲取Mapper傳送過來的數據。每次數據發送到 Reducer 前,都會根據鍵先進行排序。開發人員也可通過自定義的 SortComparator 進行數據排序,也是根據 GroupComparator 按照數據的其他特性進行分組處理,下麵章節將會詳細舉例介紹。對數據進行混洗、排序完成後,將傳送到對應的Reducer進行處理。

1.5 Reducer 階段

當 Mapper 實例完成輸入的數據超過設定值後(可通過mapreduce.job.reduce.slowstart.completedmaps 進行設置), Reducer 就會開始執行。Reducer 會接收到不同 Mapper 任務傳來已經過排序的數據,並通過Iterable 介面進行處理。在 Partitioner 階段,系統已定義哪些數據將由個 Reducer 進行管理。當 Reducer 檢測到 KEY 時發生變化時,系統就會按照已定的規則生成一個新的 Reducer 對數據進行處理。
如果 Reducer 端接受的數據量較小,數據則可直接存儲在記憶體緩衝區中,方便後面的數據輸出(緩衝區大小可通過mapred.job.shuffle.input.buffer.percent 進行設置)
如果數據量超過了該緩衝區大小的一定比例(可以通過 mapred.job.shuffle.merge.percent 進行設置),數據將會被合併後寫到磁碟中。

回到目錄

二、MapReduce 開發實例

上一章節講解了 MapReduce 的主要流程,下麵將以幾個簡單的例子模仿 SQL 關係資料庫向大家介紹一下 MapReduce 的開發過程。

HDFS常用命令  (此處只介紹幾個常用命令,詳細內容可在網上查找)

  • 創建目錄    hdfs dfs -mkdir -p 【Path】 
  • 複製文件    hdfs dfs -copyFromLocal 【InputPath】【OutputPath】
  • 查看目錄    hdfs dfs -ls 【Path】
  • 運行JAR    hadoop jar 【Jar名稱】 【Main類全名稱】 【InputPath】 【OutputPath】 

2.1 使用 SELECT 獲取數據

應用場景:假設在 hdfs 文件夾 / input / 20180509 路徑的 *.dat 類型文件中存放在著大量不同型號的 iPhone 手機當天在不同地區的銷售記錄,系統想對這些記錄進行統計,計算出不同型號手機的銷售總數。

計算時,在Mapper中獲取每一行的信息,並把iPhone名稱作為Key插入,把數據作為Value插入到Context當中。
當Reducer接收到相同Key數據後,再作統一處理。

註意  :   當前例子當中  Mapper 的輸入 Key 為  LongWritable 長類型

在此過程中要註意幾點: 例子中 SaleManager 繼承了 org.apache.hadoop.conf.Configured 類並實現了 org.apache.hadoop.util.Tool 介面的 public static int run(Configuration conf,Tool tool, String[] args) 方法,MapReduce的相關操作都在run裡面實現。由於 Configured 已經實現了 getConf() 與setConfig() 方法,創建Job時相關的配置信息就可通過getConf()方法讀入。

系統可以通過以下方法註冊Mapper及Reducer處理類
Job.setMapperClass(MyMapper.class);
Job.setReducerClass(MyReducer.class);

在整個運算過程當中,數據會經過篩選與計算,所以Mapper的讀入信息K1,V1與Reducer的輸出信息K3,V3不一定是同一格式。
org.apache.hadoop.mapreduce.Mapper<K1,V1,K2,V2>
org.apache.hadoop.mapreduce.Reducer<K2,V2,K3,V3>

當Mapper的輸出的鍵值類型與Reduces輸出的鍵值類型相同時,系統可以通過下麵方法設置轉出數據的格式
Job.setOutputKeyClass(K);
Job.setOutputValueClass(V);

當Mapper的輸出的鍵值類型與Reduces輸出的鍵值類型不相同時,系統則需要通過下麵方法設置Mapper轉出格式
Job.setMapOutputKeyClass(K);
Job.setMapOutputValueClass(V);

 1 public class Phone {
 2     public String type;
 3     public Integer count;
 4     public String area;
 5     
 6     public Phone(String line){
 7        String[] data=line.split(",");
 8        this.type=data[0].toString();
 9        this.count=Integer.valueOf(data[1].toString());
10        this.area=data[2].toString();
11     }
12     
13     public String getType(){
14         return this.type;
15     }
16     
17     public Integer getCount(){
18         return this.count;
19     }
20     
21     public String getArea(){
22         return this.area;
23     }
24 }
25 
26 public class SaleManager extends Configured implements Tool{
27     public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
28         public void map(LongWritable key,Text value,Context context)
29             throws IOException,InterruptedException{
30             String data=value.toString();
31             Phone iPhone=new Phone(data);
32             //以iPhone型號作為Key,數量為作Value傳入
33             context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount()));
34         }
35     }
36     
37     public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
38         public void reduce(Text key,Iterable<IntWritable> values,Context context)
39             throws IOException,InterruptedException{
40             int sum=0;
41             //對同一型號的iPhone數量進行統計
42             for(IntWritable val : values){
43                 sum+=val.get();
44             }
45             context.write(key, new IntWritable(sum));
46         }
47     }
48 
49     public int run(String[] arg0) throws Exception {
50         // TODO 自動生成的方法存根
51         // TODO Auto-generated method stub
52         Job job=Job.getInstance(getConf());
53         job.setJarByClass(SaleManager.class);
54         //註冊Key/Value類型為Text
55         job.setOutputKeyClass(Text.class);
56         job.setOutputValueClass(IntWritable.class);
57         //註冊Mapper及Reducer處理類
58         job.setMapperClass(MyMapper.class);
59         job.setReducerClass(MyReducer.class);
60         //輸入輸出數據格式化類型為TextInputFormat
61         job.setInputFormatClass(TextInputFormat.class);
62         job.setOutputFormatClass(TextOutputFormat.class);
63         //預設情況下Reducer數量為1個(可忽略)
64         job.setNumReduceTasks(1);
65         //獲取命令參數
66         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
67         //設置讀入文件路徑
68         FileInputFormat.setInputPaths(job,new Path(args[0]));
69         //設置轉出文件路徑
70         FileOutputFormat.setOutputPath(job,new Path(args[1]));
71         boolean status=job.waitForCompletion(true);
72         if(status)
73             return 0;
74         else
75             return 1;
76     }
77     
78     public static void main(String[] args) throws Exception{
79         Configuration conf=new Configuration();
80         ToolRunner.run(new SaleManager(), args);
81     }
82 }

計算結果

 

2.2 使用 WHERE 對數據進行篩選

在計算過程中,並非所有的數據都適用於Reduce的計算,由於海量數據是通過網路傳輸的,所消耗的 I/O 資源巨大,所以可以嘗試在Mapper過程中提前對數據進行篩選。以上面的數據為例,當前系統只需要計算輸入參數地區的銷售數據。此時只需要修改一下Mapper類,重寫setup方法,通過Configuration類的 public String[] Configuration.getStrings(參數名,預設值) 方法獲取命令輸入的參數,再對數據進行篩選。

 1 public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
 2         private String area;
 3         
 4         @Override
 5         public void setup(Context context){
 6             this.area=context.getConfiguration().getStrings("area", "BeiJing")[0];
 7         }
 8         
 9         public void map(LongWritable key,Text value,Context context)
10             throws IOException,InterruptedException{
11             String data=value.toString();
12             Phone iPhone=new Phone(data);
13             if(this.area.equals(iPhone.area))
14                 context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount()));
15         }
16     }

執行命令 hadoop jar 【Jar名稱】 【Main類全名稱】-D 【參數名=參數值】 【InputPath】 【OutputPath】
例如:hadoop jar hadoopTest-0.2.jar sun.hadoopTest.SaleManager -D area=BeiJing /tmp/input/050901 /tmp/output/050901 
此時系統將選擇 area 等於BeiJing 的數據進行統計
計算結果

回到目錄

三、利用 Partitioner 控制鍵值分配

3.1 深入分析 Partitioner

Partitioner 類在 org.apache.hadoop.mapreduce.Partitioner 中,通過 Job.setPartitionerClass(Class<? extends Partitioner> cls) 方法可綁定自定義的 Partitioner。若用戶沒有實現自定義Partitioner 時,系統將自動綁定 Hadoop 的預設類 org.apache.hadoop.mapreduce.lib.partiton.HashPartitioner 。Partitioner 包含一個主要方法是 int getPartition(K key,V value,int numReduceTasks) ,功能是控制將哪些鍵分配到哪個 Reducer。此方法的返回值是 Reducer 的索引值,若系統定義了4個Reducer,其返回值為0~3。numReduceTasks 側是當前系統的 Reducer 數量,此數量可通過Job.setNumReduceTasks(int tasks) 進行設置,在偽分佈環境下,其預設值為1。 

註意:

在單機環境下,系統只會使用一個 Reducer,這將導致 Partitioner 缺乏意義,這也是在本文引言中強調要使用偽分佈環境進行調試的原因 。

通過反編譯查看 HashPartitioner ,可見系統是通過(key.hashCode() & Interger.MAX_VALUE )%numReduceTasks 方法,根據 KEY 的 HashCode 對 Reducer 數量求餘方式,確定數據分配到哪一個 Reducer 進行處理的。但如果想根據用戶自定義的邏輯把數據分配到對應 Reducer,單依靠 HashPartitioner 是無法實現的,此時側需要自定義 Partitioner 。

1 public class HashPartitioner<K, V> extends Partitioner<K, V> {
2 
3   public int getPartition(K key, V value, int numReduceTasks) {
4     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
5   }
6 }

3.2 自定義 Partitioner

在例子當中,假設系統需要把北、上、廣、深4個不同的地區的iPhone銷售情況分別交付給不同 Reducer 進行統計處理。我們可以自定義一個 MyPartitioner, 通過 Job.setPartitionerClass( MyPartitioner.class ) 進行綁定。通過 Job.setNumReduceTasks(4) 設置4個Reducer 。以手機類型作為KEY,把銷售數據與地區作為VALUE。在 int getPartition(K key,V value,int numReduceTasks) 方法中,根據 VALUE 值的不同返回不同的索引值。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString();
  9        this.count=Integer.valueOf(data[1].toString());
 10        this.area=data[2].toString();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class MyPatitional extends Partitioner<Text,Text> {
 27 
 28     @Override
 29     public int getPartition(Text arg0, Text arg1, int arg2) {
 30         // TODO 自動生成的方法存根
 31         String area=arg1.toString().split(",")[0];
 32         // 根據不同的地區返回不同的索引值       
 33         if(area.contentEquals("BeiJing"))
 34             return 0;
 35         if(area.contentEquals("GuangZhou"))
 36             return 1;
 37         if(area.contentEquals("ShenZhen"))
 38             return 2;
 39         if(area.contentEquals("ShangHai"))
 40             return 3;
 41         return 0;
 42     }
 43 }    
 44 
 45 public class SaleManager extends Configured implements Tool{
 46     public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
 47         
 48         public void map(LongWritable key,Text value,Context context)
 49             throws IOException,InterruptedException{
 50             String data=value.toString();
 51             Phone iPhone=new Phone(data);
 52             context.write(new Text(iPhone.getType()), new Text(iPhone.getArea()+","+iPhone.getCount().toString()));
 53         }
 54     }
 55     
 56     public static class MyReducer extends Reducer<Text,Text,Text,IntWritable>{
 57         
 58         public void reduce(Text key,Iterable<Text> values,Context context)
 59             throws IOException,InterruptedException{
 60             int sum=0;
 61             //對同一型號的iPhone數量進行統計
 62             for(Text value : values){
 63                 String count=value.toString().split(",")[1];
 64                 sum+=Integer.valueOf(count).intValue();
 65             }
 66             context.write(key, new IntWritable(sum));
 67         }
 68     }
 69 
 70     public int run(String[] arg0) throws Exception {
 71         // TODO 自動生成的方法存根
 72          // TODO Auto-generated method stub
 73         Job job=Job.getInstance(getConf());
 74         job.setJarByClass(SaleManager.class);
 75         //註冊Key/Value類型為Text
 76         job.setOutputKeyClass(Text.class);
 77         job.setOutputValueClass(IntWritable.class);
 78         //若Map的轉出Key/Value不相同是需要分別註冊
 79         job.setMapOutputKeyClass(Text.class);
 80         job.setMapOutputValueClass(Text.class);
 81         //註冊Mapper及Reducer處理類
 82         job.setMapperClass(MyMapper.class);
 83         job.setReducerClass(MyReducer.class);
 84         //輸入輸出數據格式化類型為TextInputFormat
 85         job.setInputFormatClass(TextInputFormat.class);
 86         job.setOutputFormatClass(TextOutputFormat.class);
 87         //設置Reduce數量為4個,偽分散式情況下不設置時預設為1
 88         job.setNumReduceTasks(4);
 89         //註冊自定義Partitional類
 90         job.setPartitionerClass(MyPatitional.class);
 91         //獲取命令參數
 92         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
 93         //設置讀入文件路徑
 94         FileInputFormat.setInputPaths(job,new Path(args[0]));
 95         //設置轉出文件路徑
 96         FileOutputFormat.setOutputPath(job,new Path(args[1]));
 97         boolean status=job.waitForCompletion(true);
 98         if(status)
 99             return 0;
100         else
101             return 1;
102     }
103     
104     public static void main(String[] args) throws Exception{
105         Configuration conf=new Configuration();
106         ToolRunner.run(new SaleManager(), args);
107     }
108 }

計算結果


回到目錄

四、利用 Combiner 提高系統性能

在前面幾節所描述的例子當中,我們都是把所有的數據完整發送到 Reducer 中再作統計。試想一下,在真實環境當中,iPhone 的銷售記錄數以千萬計,如此巨大的數據需要在 Mapper/Reducer 當中進行傳輸,將會耗費多少的網路資源。這麼多年來 iPhone 出品的機型不過十多個,系統能否先針對同類的機型在Mapper端作出初步的聚合計算,再把計算結果發送到 Reducer。如此一來,傳到 Reducer 端的數據量將會大大減少,只要在適當的情形下使用將有利於系統的性能提升。
針對此類問題,Combiner 應運而生,我們可以把 Combiner 看作為一個小型的 Reducer ,它的目的就是在數據傳送到Reducer前在Mapper中作出初步聚集處理,減少伺服器之間的 I/O 數據傳輸壓力。Combiner 也繼承於Reducer,通過Job.setCombinerClass(Class<? extends Reducer> cls) 方法進行註冊。
下麵繼續以第3節的例子作為參考,系統想要在同一個Reducer中計算所有地區不同型號手機的銷售情況。我們可以把地區名作為KEY,把銷售數量和手機類型轉換成 MapWritable 作為 VALUE。當數據輸入後,不是直接把數據傳輸到 Reducer ,而是通過Combiner 把Mapper中不同的型號手機的銷售數量進行聚合計算,把5種型號手機的銷售總數算好後傳輸給Reducer。在Reducer中再把來源於不同 Combiner 的數據進行求和,得出最後結果。

註意  :   

MapWritable 是 系統自帶的 Writable 集合類中的其中一個,它實現了  java.util.Map<Writable,Writable> 介面,以單位元組充當類型數據的索引,常用於枚舉集合的元素。

 

  1 public class SaleManager extends Configured implements Tool{
  2     private static IntWritable TYPE=new IntWritable(0);
  3     private static IntWritable VALUE=new IntWritable(1);
  4     private static IntWritable IPHONE7=new IntWritable(2);
  5     private static IntWritable IPHONE7_PLUS=new IntWritable(3);
  6     private static IntWritable IPHONE8=new IntWritable(4);
  7     private static IntWritable IPHONE8_PLUS=new IntWritable(5);
  8     private static IntWritable IPHONEX=new IntWritable(6);
  9     
 10     public static class MyMapper extends Mapper<LongWritable,Text,Text,MapWritable>{
 11         
 12         public void map(LongWritable key,Text value,Context context)
 13             throws IOException,InterruptedException{
 14             String data=value.toString();
 15             Phone iPhone=new Phone(data);
 16             context.write(new Text(iPhone.getArea()), getMapWritable(iPhone.getType(), iPhone.getCount()));
 17         }      
 18         
 19         private MapWritable getMapWritable(String type,Integer count){
 20             Text _type=new Text(type);
 21             Text _count=new Text(count.toString());
 22             MapWritable mapWritable=new MapWritable();
 23             mapWritable.put(TYPE,_type);
 24             mapWritable.put(VALUE,_count);
 25             return mapWritable;
 26         }
 27     }
 28     
 29     public static class MyCombiner extends Reducer<Text,MapWritable,Text,MapWritable> {
 30         public void reduce(Text key,Iterable<MapWritable> values,Context context) 
 31             throws IOException, InterruptedException{
 32                int iPhone7=0;
 33                int iPhone7_PLUS=0;
 34                int iPhone8=0;
 35                int iPhone8_PLUS=0;
 36                int iPhoneX=0;
 37                //對同一個Mapper所處理的不同型號的手機數據進行初步統計
 38                for(MapWritable value:values){
 39                     String type=value.get(TYPE).toString();
 40                     Integer count=Integer.valueOf(value.get(VALUE).toString());
 41                     if(type.contentEquals("iPhone7"))
 42                         iPhone7+=count;
 43                     if(type.contentEquals("iPhone7_PLUS"))
 44                         iPhone7_PLUS+=count;
 45                     if(type.contentEquals("iPhone8"))
 46                         iPhone8+=count;
 47                     if(type.contentEquals("iPhone8_PLUS"))
 48                         iPhone8_PLUS+=count;
 49                     if(type.contentEquals("iPhoneX"))
 50                         iPhoneX+=count;
 51                 }                
 52                 MapWritable mapWritable=new MapWritable();
 53                 mapWritable.put(IPHONE7, new IntWritable(iPhone7));
 54                 mapWritable.put(IPHONE7_PLUS, new IntWritable(iPhone7_PLUS));
 55                 mapWritable.put(IPHONE8, new IntWritable(iPhone8));
 56                 mapWritable.put(IPHONE8_PLUS, new IntWritable(iPhone8_PLUS));
 57                 mapWritable.put(IPHONEX, new IntWritable(iPhoneX));
 58                 context.write(key,mapWritable);
 59         }
 60    }
 61     
 62     public static class MyReducer extends Reducer<Text,MapWritable,Text,Text>{
 63         public void reduce(Text key,Iterable<MapWritable> values,Context context)
 64             throws IOException,InterruptedException{
 65                int iPhone7=0;
 66                int iPhone7_PLUS=0;
 67                int iPhone8=0;
 68                int iPhone8_PLUS=0;
 69                int iPhoneX=0;
 70 
 71             //對同一地區不同型的iPhone數量進行統計
 72             for(MapWritable value : values){
 73                 iPhone7+=Integer.parseInt(value.get(IPHONE7).toString());
 74                 iPhone7_PLUS+=Integer.parseInt(value.get(IPHONE7_PLUS).toString());
 75                 iPhone8+=Integer.parseInt(value.get(IPHONE8).toString());
 76                 iPhone8_PLUS+=Integer.parseInt(value.get(IPHONE8_PLUS).toString());
 77                 iPhoneX+=Integer.parseInt(value.get(IPHONEX).toString());
 78             }
 79             
 80             StringBuffer data=new StringBuffer();
 81             data.append("iPhone7:"+iPhone7+"   ");
 82             data.append("iPhone7_PLUS:"+iPhone7_PLUS+"   ");
 83             data.append("iPhone8:"+iPhone8+"   ");
 84             data.append("iPhone8_PLUS:"+iPhone8_PLUS+"   ");
 85             data.append("iPhoneX:"+iPhoneX+"   ");
 86             context.write(key, new Text(data.toString()));
 87         }
 88     }
 89 
 90     public int run(String[] arg0) throws Exception {
 91         // TODO 自動生成的方法存根
 92         // TODO Auto-generated method stub
 93         Job job=Job.getInstance(getConf());
 94         job.setJarByClass(SaleManager.class);
 95         //註冊Key/Value類型為Text
 96         job.setOutputKeyClass(Text.class);
 97         job.setOutputValueClass(Text.class);
 98         //若Map的轉出Key/Value不相同是需要分別註冊
 99         job.setMapOutputKeyClass(Text.class);
100         job.setMapOutputValueClass(MapWritable.class);
101         //註冊Mapper及Reducer處理類
102         job.setMapperClass(MyMapper.class);
103         job.setReducerClass(MyReducer.class);
104         //註冊Combiner處理類
105         job.setCombinerClass(MyCombiner.class);
106         //輸入輸出數據格式化類型為TextInputFormat
107         job.setInputFormatClass(TextInputFormat.class);
108         job.setOutputFormatClass(TextOutputFormat.class);
109         //偽分散式情況下不設置時預設為1
110         job.setNumReduceTasks(1);
111         //獲取命令參數
112         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
113         //設置讀入文件路徑
114         FileInputFormat.setInputPaths(job,new Path(args[0]));
115         //
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • mysql根據查詢結果更新多條數據(插入或更新) 1.1 前言 mysql根據查詢結果執行批量更新或插入時經常會遇到1093的錯誤問題。基本上批量插入或新增都會涉及到子查詢,mysql是建議不要對需要操作的表放入子查詢條件中的,因此我們儘量避免子查詢中涉及到需要操作的表,如果無法避免,則可以考慮用連 ...
  • 參考文檔: https://docs.oracle.com/en/database/oracle/oracle-database/12.2/cwlin/index.html https://docs.oracle.com/en/database/oracle/oracle-database/12.2 ...
  • 參考文檔: https://docs.oracle.com/en/database/oracle/oracle-database/12.2/cwlin/index.html https://docs.oracle.com/en/database/oracle/oracle-database/12.2 ...
  • 一.概述 前面章節介紹了很多資料庫的優化措施,但在實際生產環境中,由於資料庫伺服器本身的性能局限,就必須要對前臺的應用來進行優化,使得前臺訪問資料庫的壓力能夠減到最小。 1. 使用連接池 對於訪問資料庫來說,建立連接的代價比較昂貴,因為連接到資料庫伺服器需要經歷多個步驟如:建立物理通道,伺服器進行初 ...
  • 常用的SQL 由淺入深 大致上回想一下自己常用的SQL,並做個記錄,目標是實現可以通過在此頁面查找到自己需要的SQL ,陸續補充 有不足之處,請提醒改正 首先我創建了兩個庫,每個庫兩張表.(工作的時候,每個公司最好有自己的資料庫模型,產品也可以看,模型工具一般用PD(power designer)什 ...
  • select request_session_id spid, OBJECT_NAME(resource_associated_entity_id) tableName from sys.dm_tran_locks where resource_type='OBJECT' 然後kill 裡面的進程 ...
  • 一、創建資料庫1.啟動服務(三種) 開始--cmd--輸入net start mssqlserver 開始--運行--net start mssqlserver 開始--服務--找到SQLSERVER(MSSQLSERVER)--右鍵--啟動 2.打開ssms 3.登錄:windows身份驗證 混合 ...
  • 在鞋廠的第一個任務,拆表。需要把訂單表按照開始日期和結束日期拆分成多條記錄,挺新鮮的~ transform方式,使用到了python。 (1)把hive表的數據傳入,通過python按照日期迴圈處理,返回多條記錄。 (2)生成序列表,然後採用cross join的方式,在hive端生成多條記錄,再根 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...