關於mapreduce的一些註意細節 如果把mapreduce程式打包放到了liux下去運行, 命令java –cp xxx.jar 主類名 如果報錯了,說明是缺少相關的依賴jar包 用命令hadoop jar xxx.jar 類名因為在集群機器上用 hadoop jar xx.jar mr.wc. ...
關於mapreduce的一些註意細節
如果把mapreduce程式打包放到了liux下去運行,
命令java –cp xxx.jar 主類名
如果報錯了,說明是缺少相關的依賴jar包
用命令hadoop jar xxx.jar 類名因為在集群機器上用 hadoop jar xx.jar mr.wc.JobSubmitter 命令來啟動客戶端main方法時,hadoop jar這個命令會將所在機器上的hadoop安裝目錄中的jar包和配置文件加入到運行時的classpath中
那麼,我們的客戶端main方法中的new Configuration()語句就會載入classpath中的配置文件,自然就有了
fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 這些參數配置
會把本地hadoop的相關的所有jar包都會引用
Mapreduce也有本地的job運行,就是可以不用提交到yarn上,可以以單機的模式跑一邊以多個線程模擬也可以。
就是如果不管在Linux下還是windows下,提交job都會預設的提交到本地去運行,
如果在linux預設提交到yarn上運行,需要寫配置文件hadoop/etc/mapred-site.xml文件
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
Key,value對,如果是自己的類的話,那麼這個類要實現Writable,同時要把你想序列化的數據轉化成二進位,然後放到重寫方法wirte參數的DataOutput裡面,另一個readFields重寫方法是用來反序列化用的,
註意反序列化的時候,會先拿這個類的無參構造方法構造出一個對象出來,然後再通過readFields方法來複原這個對象。
DataOutput也是一種流,只不過是hadoop的在封裝,自己用的時候,裡面需要加個FileOutputStream對象
DataOutput寫字元串的時候要用writeUTF(“字元串”),他這樣編碼的時候,會在字元串的前面先加上字元串的長度,這是考慮到字元編碼對其的問題,hadoop解析的時候就會先讀前面兩個位元組,看一看這個字元串有多長,不然如果用write(字元串.getBytes())這樣他不知道這個字元串到底有多少個位元組。
在reduce階段,如果把一個對象寫到hdfs裡面,那麼會調用字元串的toString方法,你可以重寫這個類的toString方法
舉例,下麵這個類就可以在hadoop里序列化
package mapreduce2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Waitable; public class FlowBean implements Writable { private int up;//上行流量 private int down;//下行流量 private int sum;//總流量 private String phone;//電話號 public FlowBean(int up, int down, String phone) { this.up = up; this.down = down; this.sum = up + down; this.phone = phone; } public int getUp() { return up; } public void setUp(int up) { this.up = up; } public int getDown() { return down; } public void setDown(int down) { this.down = down; } public int getSum() { return sum; } public void setSum(int sum) { this.sum = sum; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } @Override public void readFields(DataInput di) throws IOException { //註意這裡讀的順序要和寫的順序是一樣的 this.up = di.readInt(); this.down = di.readInt(); this.sum = this.up + this.down; this.phone = di.readUTF(); } @Override public void write(DataOutput Do) throws IOException { Do.writeInt(this.up); Do.writeInt(this.down); Do.writeInt(this.sum); Do.writeUTF(this.phone); } @Override public String toString() { return "電話號"+this.phone+" 總流量"+this.sum; } }
當所有的reduceTask都運行完之後,還會調用一個cleanup方法
應用練習:統計一個頁面訪問總量為n條的數據
方案一:只用一個reducetask,利用cleanup方法,在reducetask階段,先不直接放到hdfs裡面,而是存到一個Treemap裡面
再在reducetask結束後,在cleanup裡面通過把Treemap裡面前五輸出到HDFS裡面;
package cn.edu360.mr.page.topn; public class PageCount implements Comparable<PageCount>{ private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(PageCount o) { return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count; } }
map類
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class PageTopnMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); context.write(new Text(split[1]), new IntWritable(1)); } }
reduce類
package cn.edu360.mr.page.topn; import java.io.IOException; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class PageTopnReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ TreeMap<PageCount, Object> treeMap = new TreeMap<>(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } PageCount pageCount = new PageCount(); pageCount.set(key.toString(), count); treeMap.put(pageCount,null); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration();
//可以在cleanup裡面拿到configuration,從裡面讀取要拿前幾條數據 int topn = conf.getInt("top.n", 5); Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet(); int i= 0; for (Entry<PageCount, Object> entry : entrySet) { context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount())); i++; if(i==topn) return; } } }
然後jobSubmit類,註意這個要設定Configuration,這裡面有幾種方法
第一種是載入配置文件
Configuration conf = new Configuration(); conf.addResource("xx-oo.xml");
然後再在xx-oo.xml文件裡面寫
<configuration> <property> <name>top.n</name> <value>6</value> </property> </configuration>
第二種方式
//通過直接設定 conf.setInt("top.n", 3); //通過對java主程式 直接傳進來的參數 conf.setInt("top.n", Integer.parseInt(args[0]));
第三種方式通過獲取配置文件參數
Properties props = new Properties(); props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties")); conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));
然後再在topn.properties裡面配置參數
top.n=5
subsubmit類,預設在本機模擬運行
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobSubmitter { public static void main(String[] args) throws Exception { /** * 通過載入classpath下的*-site.xml文件解析參數 */ Configuration conf = new Configuration(); conf.addResource("xx-oo.xml"); /** * 通過代碼設置參數 */ //conf.setInt("top.n", 3); //conf.setInt("top.n", Integer.parseInt(args[0])); /** * 通過屬性配置文件獲取參數 */ /*Properties props = new Properties(); props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties")); conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));*/ Job job = Job.getInstance(conf); job.setJarByClass(JobSubmitter.class); job.setMapperClass(PageTopnMapper.class); job.setReducerClass(PageTopnReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\url\\input")); FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\url\\output")); job.waitForCompletion(true); } }
有時一個任務一個mapreduce是完成不了的,有可能會拆分成兩個或多個mapreduce
map階段會有自己的排序機制,比如一組數據(a,1),(b,1),(a,1),(c,1),他會先處理key為1的一組數據,
這個排序機制我們也可以自己去實現,要對這個類實現Comparable介面,然後重寫compareTo方法。
但要註意這個排序機制只是對於一個reducetask來說的,如果有多個的話,只會得到局部排序。
如果要多個reducetask的話,我們就需要控制數據的分發規則,這樣雖然是會生成多個排序後的文件,但這些文件整體上依然是有序的。因為我們控制了每一個reducetask處理數據的範圍。
額外java知識點補充
Treemap,放進去的東西會自動排序
兩種Treemap的自定義方法,第一種是傳入一個Comparator
public class TreeMapTest { public static void main(String[] args) { TreeMap<FlowBean, String> tm1 = new TreeMap<>(new Comparator<FlowBean>() { @Override public int compare(FlowBean o1, FlowBean o2) { //如果兩個類總流量相同的會比較電話號 if( o2.getAmountFlow()-o1.getAmountFlow()==0){ return o1.getPhone().compareTo(o2.getPhone()); } //如果流量不同,就按從小到大的順序排序 return o2.getAmountFlow()-o1.getAmountFlow(); } }); FlowBean b1 = new FlowBean("1367788", 500, 300); FlowBean b2 = new FlowBean("1367766", 400, 200); FlowBean b3 = new FlowBean("1367755", 600, 400); FlowBean b4 = new FlowBean("1367744", 300, 500); tm1.put(b1, null); tm1.put(b2, null); tm1.put(b3, null); tm1.put(b4, null); //treeset的遍歷 Set<Entry<FlowBean,String>> entrySet = tm1.entrySet(); for (Entry<FlowBean,String> entry : entrySet) { System.out.println(entry.getKey() +"\t"+ entry.getValue()); } } }
第二種是在這個類中,實現一個Comparable介面
package cn.edu360.mr.page.topn; public class PageCount implements Comparable<PageCount>{ private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(PageCount o) { return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count; } }