1、數據樣式 寫入之前,需要整理以下數據的格式,之後將數據保存到hdfs中,本例使用的樣式如下(用tab分開): 2、代碼 假設要將以上樣式的數據寫入到hbase中,列族為cf,列名為colb,可以使用下麵的代碼(參考) 這段代碼使用mapreduce程式對數據做了進一步處理,之後調用相關的api將 ...
1、數據樣式
寫入之前,需要整理以下數據的格式,之後將數據保存到hdfs中,本例使用的樣式如下(用tab分開):
row1 N row2 M row3 B row4 V row5 N row6 M row7 B
2、代碼
假設要將以上樣式的數據寫入到hbase中,列族為cf,列名為colb,可以使用下麵的代碼(參考)
1 package com.testdata; 2 3 import java.io.IOException; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.client.Put; 9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 10 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; 11 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; 12 import org.apache.hadoop.hbase.mapreduce.PutSortReducer; 13 import org.apache.hadoop.hbase.util.Bytes; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.mapreduce.Job; 16 import org.apache.hadoop.mapreduce.Mapper; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 20 public class TestBulkLoad { 21 22 public static class LoadMapper extends Mapper<Object,Text,ImmutableBytesWritable,Put>{ 23 24 @Override 25 protected void map(Object key, Text value, Context context) 26 throws IOException, InterruptedException { 27 String[] values = value.toString().split("\t"); 28 if(values.length ==2 ){ 29 byte[] rowkey = Bytes.toBytes(values[0]); 30 byte[] col_value = Bytes.toBytes(values[1]); 31 byte[] familly = Bytes.toBytes("cf"); 32 byte[] column = Bytes.toBytes("colb"); 33 ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey); 34 Put testput = new Put(rowkey); 35 testput.add(familly,column,col_value); 36 context.write(rowkeyWritable, testput); 37 } 38 39 } 40 } 41 public static void main(String[] args) throws Exception { 42 if(args.length !=4 ){ 43 System.exit(0); 44 } 45 46 String in = args[0]; 47 String out = args[1]; 48 int unitmb =Integer.valueOf(args[2]); 49 String tbname = args[3]; 50 51 Configuration conf = new Configuration(); 52 conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024)); 53 conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024)); 54 conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024)); 55 conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024)); 56 57 Job job = new Job(conf); 58 FileInputFormat.addInputPath(job, new Path(in)); 59 FileOutputFormat.setOutputPath(job, new Path(out)); 60 job.setMapperClass(LoadMapper.class); 61 job.setReducerClass(PutSortReducer.class); 62 job.setOutputFormatClass(HFileOutputFormat2.class); 63 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 64 job.setMapOutputValueClass(Put.class); 65 job.setJarByClass(TestBulkLoad.class); 66 67 Configuration hbaseconf = HBaseConfiguration.create(); 68 HTable table = new HTable(hbaseconf,tbname); 69 HFileOutputFormat2.configureIncrementalLoad(job, table); 70 71 job.waitForCompletion(true); 72 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf); 73 loader.doBulkLoad(new Path(out), table); 74 75 } 76 77 }
這段代碼使用mapreduce程式對數據做了進一步處理,之後調用相關的api將數據寫入hbase中。PutSortReducer是一個自帶的reducer類,不需要再進行編寫。
3、執行
數據保存在TEXT文件中,上面代碼導出的jar包為bulkload,hbase的數據表名稱為testdata,註意,先指定以下HADOOP_CLASSPATH,避免出錯。
1 export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$HADOOP_CLASSPATH
2 hadoop jar ./Downloads/bulkload.jar com.testdata.TestBulkLoad Test hbasedata 64 testdata
4、結果
,