MapReduce是什麼 MapReduce是一種分散式計算編程框架,是Hadoop主要組成部分之一,可以讓用戶專註於編寫核心邏輯代碼,最後以高可靠、高容錯的方式在大型集群上並行處理大量數據。 MapReduce的存儲 MapReduce的數據是存儲在HDFS上的,HDFS也是Hadoop的主要組成 ...
MapReduce是什麼
MapReduce是一種分散式計算編程框架,是Hadoop主要組成部分之一,可以讓用戶專註於編寫核心邏輯代碼,最後以高可靠、高容錯的方式在大型集群上並行處理大量數據。
MapReduce的存儲
MapReduce的數據是存儲在HDFS上的,HDFS也是Hadoop的主要組成部分之一。下邊是MapReduce在HDFS上的存儲的圖解
HDFS主要有Namenode和Datanode兩部分組成,整個集群有一個Namenode和多個DataNode,通常每一個節點一個DataNode,Namenode的主要功能是用來管理客戶端client對數據文件的操作請求和儲存數據文件的地址。DataNode主要是用來儲存和管理本節點的數據文件。節點內部數據文件被分為一個或多個block塊(block預設大小原來是64MB,後來變為128MB),然後這些塊儲存在一組DataNode中。(這裡不對HDFS做過多的介紹,後續會寫一篇詳細的HDFS筆記)
MapReduce的運行流程
1、首先把需要處理的數據文件上傳到HDFS上,然後這些數據會被分為好多個小的分片,然後每個分片對應一個map任務,推薦情況下分片的大小等於block塊的大小。然後map的計算結果會暫存到一個記憶體緩衝區內,該緩衝區預設為100M,等緩存的數據達到一個閾值的時候,預設情況下是80%,然後會在磁碟創建一個文件,開始向文件裡邊寫入數據。
2、map任務的輸入數據的格式是<key,value>對的形式,我們也可以自定義自己的<key,value>類型。然後map在往記憶體緩衝區里寫入數據的時候會根據key進行排序,同樣溢寫到磁碟的文件里的數據也是排好序的,最後map任務結束的時候可能會產生多個數據文件,然後把這些數據文件再根據歸併排序合併成一個大的文件。
3、然後每個分片都會經過map任務後產生一個排好序的文件,同樣文件的格式也是<key,value>對的形式,然後通過對key進行hash的方式把數據分配到不同的reduce裡邊去,這樣對每個分片的數據進行hash,再把每個分片分配過來的數據進行合併,合併過程中也是不斷進行排序的。最後數據經過reduce任務的處理就產生了最後的輸出。
4、在我們開發中只需要對中間map和reduce的邏輯進行開發就可以了,中間分片,排序,合併,分配都有MapReduce框架幫我完成了。
MapReduce的資源調度系統
最後我們來看一下MapReduce的資源調度系統Yarn。
Yarn的基本思想是將資源管理和作業調度/監視的功能分解為單獨的守護進程。全局唯一的ResourceManager是負責所有應用程式之間的資源的調度和分配,每個程式有一個ApplicationMaster,ApplicationMaster實際上是一個特定於框架的庫,其任務是協調來自ResourceManager的資源,並與NodeManager一起執行和監視任務。NodeManager是每台機器框架代理,監視其資源使用情況(CPU,記憶體,磁碟,網路)並將其報告給ResourceManager。
WordConut代碼
- python實現
map.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys
for line in sys.stdin:
words = line.strip().split()
for word in words:
print('%s\t%s' % (word, 1))
reduce.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys
current_word = None
sum = 0
for line in sys.stdin:
word, count = line.strip().split(' ')
if current_word == None:
current_word = word
if word != current_word:
print('%s\t%s' % (current_word, sum))
current_word = word
sum = 0
sum += int(count)
print('%s\t%s' % (current_word, sum))
我們先把輸入文件上傳到HDFS上去
hadoop fs -put /input.txt /
然後在Linux下運行,為了方便我們把命令寫成了shell文件
HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
INPUT_FILE_PATH="/input.txt"
OUTPUT_FILE_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrush $OUTPUT_FILE_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_FILE_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file "./map.py" \
-file "./reduce.py"
- java實現
MyMap.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMap extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word: words){
text.set(word);
context.write(text,one);
}
}
}
MyReduce.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i:values){
sum+=i.get();
}
result.set(sum);
context.write(key,result);
}
}
WordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "WordCount");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
把工程打成jar包,然後把jar包和輸入文件上傳到HDfs
$ hadoop fs -put /wordcount.jar /
$ hadoop fs -put /input.txt /
執行wordcount任務
$ bin/hadoop jar wordcount.jar WordCount /input.txt /user/joe/wordcount/output
歡迎關註公眾號:「努力給自己看」