hdfs和hbase的交互,和寫MapReduce程式類似,只是需要修改輸入輸出數據和使用hbase的javaAPI對其進行操作處理即可 public class HBaseToHdfs extends ToolRunner implements Tool { private Configurati... ...
hdfs和hbase的交互,和寫MapReduce程式類似,只是需要修改輸入輸出數據和使用hbase的javaAPI對其進行操作處理即可
public class HBaseToHdfs extends ToolRunner implements Tool { private Configuration configuration; //配置文件需要配置的屬性 private static final String HDFS_NAME = "fs.defaultFS"; private static final String HDFS_VALUE = "hdfs://mycluster"; private static final String MAPREDUCE_NAME = "mapreduce.framework.name"; private static final String MAPREDUCE_VALUE = "yarn"; private static final String HBASE_NAME = "hbase.zookeeper.quorum"; private static final String HBASE_VALUE = "qiaojunlong3:2181,qiaojunlong4:2181,qiaojunlong5:2181"; //獲取hbase表的掃描對象 private Scan getscan() { return new Scan(); } @Override public int run(String[] args) throws Exception { getConf(); //獲取job實例對象 Job job = Job.getInstance(configuration, "copy_move"); //map/reduce的class鏈接 job.setMapperClass(hbase_To_Hdfs.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //設置輸入輸出 //由hbase導數據到hdfs故輸入端需要使用TableMapReduceUtil類 TableMapReduceUtil.initTableMapperJob("ns3:t5", getscan(), hbase_To_Hdfs.class, Text.class, NullWritable.class, job); FileOutputFormat.setOutputPath(job, new Path(args[0])); //設置jar包 job.setJarByClass(HBaseToHdfs.class); //提交作業 int b = job.waitForCompletion(true) ? 0 : 1; return b; } @Override public void setConf(Configuration configuration) { configuration.set(HDFS_NAME, HDFS_VALUE); configuration.set(MAPREDUCE_NAME, MAPREDUCE_VALUE); configuration.set(HBASE_NAME, HBASE_VALUE); this.configuration = configuration; } @Override public Configuration getConf() { return configuration; } public static void main(String[] args) throws Exception { ToolRunner.run(HBaseConfiguration.create(),new HBaseToHdfs() , args); } // 創建map程式 private static Text mkey = new Text();
static class hbase_To_Hdfs extends TableMapper<Text, NullWritable> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //定義字元串拼接 StringBuffer stringBuffer = new StringBuffer(); /** * 使用value獲取掃描器,獲取hbase表的列名/列值等信息 * 使用StringBuffer來對需要的信息進行字元串拼接 */ CellScanner cellScanner = value.cellScanner(); while (cellScanner.advance()) { Cell cell = cellScanner.current(); stringBuffer.append(new String(CellUtil.cloneValue(cell))).append("\t"); } mkey.set(stringBuffer.toString()); context.write(mkey, NullWritable.get()); } } }