一、Mapjoin案例 1.需求:有兩個文件,分別是訂單表、商品表, 訂單表有三個屬性分別為訂單時間、商品id、訂單id(表示內容量大的表), 商品表有兩個屬性分別為商品id、商品名稱(表示內容量小的表,用於載入到記憶體), 要求結果文件為在訂單表中的每一行最後添加商品id對應的商品名稱。 2.解決思 ...
一、Mapjoin案例
1.需求:有兩個文件,分別是訂單表、商品表,
訂單表有三個屬性分別為訂單時間、商品id、訂單id(表示內容量大的表),
商品表有兩個屬性分別為商品id、商品名稱(表示內容量小的表,用於載入到記憶體),
要求結果文件為在訂單表中的每一行最後添加商品id對應的商品名稱。
2.解決思路:
將商品表載入到記憶體中,然後再map方法中將訂單表中的商品id對應的商品名稱添加到該行的最後,不需要Reducer,併在Driver執行類中設置setCacheFile和numReduceTask。
3.代碼如下:
public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ HashMap<String, String> pdMap = new HashMap<>(); //1.商品表載入到記憶體 protected void setup(Context context) throws IOException { //載入緩存文件 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8")); String line; while(StringUtils.isNotEmpty(line = br.readLine()) ) { //切分 String[] fields = line.split("\t"); //緩存 pdMap.put(fields[0], fields[1]); } br.close(); } //2.map傳輸 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //獲取數據 String line = value.toString(); //切割 String[] fields = line.split("\t"); //獲取訂單中商品id String pid = fields[1]; //根據訂單商品id獲取商品名 String pName = pdMap.get(pid); //拼接數據 line = line + "\t" + pName; //輸出 context.write(new Text(line), NullWritable.get()); } } public class CacheDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1.獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.獲取jar包 job.setJarByClass(CacheDriver.class); // 3.獲取自定義的mapper與reducer類 job.setMapperClass(CacheMapper.class); // 5.設置reduce輸出的數據類型(最終的數據類型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 6.設置輸入存在的路徑與處理後的結果路徑 FileInputFormat.setInputPaths(job, new Path("c://table1029//in")); FileOutputFormat.setOutputPath(job, new Path("c://table1029//out")); //載入緩存商品數據 job.addCacheFile(new URI("file:///c:/inputcache/pd.txt")); //設置一下reducetask的數量 job.setNumReduceTasks(0); // 7.提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
二、Reducejoin案例
1.需求:同上的兩個數據文件,要求將訂單表中的商品id替換成對應的商品名稱。
2.解決思路:封裝TableBean類,包含屬性:時間、商品id、訂單id、商品名稱、flag(flag用來判斷是哪張表),
使用Mapper讀兩張表,通過context對象獲取切片對象,然後通過切片獲取切片名稱和路徑的字元串來判斷是哪張表,再將切片的數據封裝到TableBean對象,最後以產品id為key、TableBean對象為value傳輸到Reducer端;
Reducer接收數據後通過flag判斷是哪張表,因為一個reduce中的所有數據的key是相同的,將商品表的商品id和商品名稱讀入到一個TableBean對象中,然後將訂單表的中的數據讀入到TableBean類型的ArrayList對象中,然後將ArrayList中的每個TableBean的商品id替換為商品名稱,然後遍歷該數組以TableBean為key輸出。
3.代碼如下:
/** * @author: PrincessHug * @date: 2019/3/30, 2:37 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class TableBean implements Writable { private String timeStamp; private String productId; private String orderId; private String productName; private String flag; public TableBean() { } public String getTimeStamp() { return timeStamp; } public void setTimeStamp(String timeStamp) { this.timeStamp = timeStamp; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(timeStamp); out.writeUTF(productId); out.writeUTF(orderId); out.writeUTF(productName); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { timeStamp = in.readUTF(); productId = in.readUTF(); orderId = in.readUTF(); productName = in.readUTF(); flag = in.readUTF(); } @Override public String toString() { return timeStamp + "\t" + productName + "\t" + orderId; } } public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //通過切片獲取文件信息 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); //獲取一行數據、定義TableBean對象 String line = value.toString(); TableBean tb = new TableBean(); Text t = new Text(); //判斷是哪一張表 if (name.contains("order.txt")){ String[] fields = line.split("\t"); tb.setTimeStamp(fields[0]); tb.setProductId(fields[1]); tb.setOrderId(fields[2]); tb.setProductName(""); tb.setFlag("0"); t.set(fields[1]); }else { String[] fields = line.split("\t"); tb.setTimeStamp(""); tb.setProductId(fields[0]); tb.setOrderId(""); tb.setProductName(fields[1]); tb.setFlag("1"); t.set(fields[0]); } context.write(t,tb); } } public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { //分別創建用來存儲訂單表和產品表的集合 ArrayList<TableBean> orderBean = new ArrayList<>(); TableBean productBean = new TableBean(); //遍歷values,通過flag判斷是產品表還是訂單表 for (TableBean v:values){ if (v.getFlag().equals("0")){ TableBean tableBean = new TableBean(); try { BeanUtils.copyProperties(tableBean,v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBean.add(tableBean); }else { try { BeanUtils.copyProperties(productBean,v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //拼接表 for (TableBean ob:orderBean) { ob.setProductName(productBean.getProductName()); context.write(ob,NullWritable.get()); } } } public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //jar包 job.setJarByClass(TableDriver.class); //Mapper、Reducer job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); //Mapper輸出數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); //Reducer輸出數據類型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); //輸入輸出路徑 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\reducejoin\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\reducejoin\\out")); //提交任務 if (job.waitForCompletion(true)){ System.out.println("運行完成!"); }else { System.out.println("運行失敗!"); } } }