前提已經安裝好hadoop的hdfs集群,可以查看 https://www.cnblogs.com/tree1123/p/10683570.html Mapreduce是hadoop的運算框架,可以對hdfs中的數據分開進行計算,先執行很多maptask,在執行reducetask,這個過程中任務的 ...
前提已經安裝好hadoop的hdfs集群,可以查看
https://www.cnblogs.com/tree1123/p/10683570.html
Mapreduce是hadoop的運算框架,可以對hdfs中的數據分開進行計算,先執行很多maptask,在執行reducetask,這個過程中任務的執行需要一個任務調度的平臺,就是yarn。
一、安裝YARN集群
yarn集群中有兩個角色:
主節點:Resource Manager 1台
從節點:Node Manager N台
Resource Manager一般安裝在一臺專門的機器上
Node Manager應該與HDFS中的data node重疊在一起
修改配置文件:yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>主機名</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
然後scp到所有機器,修改主節點hadoop的slaves文件,列入要啟動nodemanager的機器,配好免密
然後,就可以用腳本啟動yarn集群:
sbin/start-yarn.sh
停止:
sbin/stop-yarn.sh
頁面:http://主節點:8088 看看node manager節點是否識別
開發一個提交job到yarn的客戶端類,mapreduce所有jar和自定義類,打成jar包上傳到hadoop集群中的任意一臺機器上,運行jar包中的(YARN客戶端類
hadoop jar ......JobSubmitter
二、開發mapreduce程式
主要需要開發:
map階段的進、出數據,
reduce階段的進、出數據,
類型都應該是實現了HADOOP序列化框架的類型,如:
String對應Text
Integer對應IntWritable
Long對應LongWritable
例子wordcount代碼:
WordcountMapper
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 切單詞
String line = value.toString();
String[] words = line.split(" ");
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}
WordcountReducer
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int count = 0;
Iterator<IntWritable> iterator = values.iterator();
while(iterator.hasNext()){
IntWritable value = iterator.next();
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
public class JobSubmitter {
public static void main(String[] args) throws Exception {
// 在代碼中設置JVM系統參數,用於給job對象來獲取訪問HDFS的用戶身份
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
// 1、設置job運行時要訪問的預設文件系統
conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
// 2、設置job提交到哪去運行
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hdp-01");
// 3、如果要從windows系統上運行這個job提交客戶端程式,則需要加這個跨平臺提交的參數
conf.set("mapreduce.app-submission.cross-platform","true");
Job job = Job.getInstance(conf);
// 1、封裝參數:jar包所在的位置
job.setJar("d:/wc.jar");
//job.setJarByClass(JobSubmitter.class);
// 2、封裝參數: 本次job所要調用的Mapper實現類、Reducer實現類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 3、封裝參數:本次job的Mapper實現類、Reducer實現類產生的結果數據的key、value類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path output = new Path("/wordcount/output");
FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
if(fs.exists(output)){
fs.delete(output, true);
}
// 4、封裝參數:本次job要處理的輸入數據集所在路徑、最終結果的輸出路徑
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, output); // 註意:輸出路徑必須不存在
// 5、封裝參數:想要啟動的reduce task的數量
job.setNumReduceTasks(2);
// 6、提交job給yarn
boolean res = job.waitForCompletion(true);
System.exit(res?0:-1);
}
}
MR還有一些高級的用法:自定義類型,自定義Partitioner,Combiner,排序,倒排索引,自定義GroupingComparator
三、mapreduce與yarn的核心機制
yarn是一個分散式程式的運行調度平臺
yarn中有兩大核心角色:
1、Resource Manager
接受用戶提交的分散式計算程式,併為其劃分資源
管理、監控各個Node Manager上的資源情況,以便於均衡負載
2、Node Manager
管理它所在機器的運算資源(cpu + 記憶體)
負責接受Resource Manager分配的任務,創建容器、回收資源
Mapreduce工作機制:
劃分輸入切片——》 環形緩衝區 ——》 分區排序 ——》Combiner 局部聚合——》shuffle ——》GroupingComparator——》輸出