在使用Spark時經常需要把數據落入HBase中,如果使用普通的Java API,寫入會速度很慢。還好Spark提供了Bulk寫入方式的介面。那麼Bulk寫入與普通寫入相比有什麼優勢呢? BulkLoad不會寫WAL,也不會產生flush以及split。 如果我們大量調用PUT介面插入數據,可能會導 ...
在使用Spark時經常需要把數據落入HBase中,如果使用普通的Java API,寫入會速度很慢。還好Spark提供了Bulk寫入方式的介面。那麼Bulk寫入與普通寫入相比有什麼優勢呢?
- BulkLoad不會寫WAL,也不會產生flush以及split。
- 如果我們大量調用PUT介面插入數據,可能會導致大量的GC操作。除了影響性能之外,嚴重時甚至可能會對HBase節點的穩定性造成影響。但是採用Bulk就不會有這個顧慮。
- 過程中沒有大量的介面調用消耗性能
下麵給出完整代碼:
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.conf.Configuration
/**
* Created by shaonian
*/
object HBaseBulk {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Bulk")
val sc = new SparkContext(sparkConf)
val conf = new Configuration()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, "bulktest")
val job = Job.getInstance(conf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val init = sc.makeRDD(Array("1,james,32", "2,lebron,30", "3,harden,28"))
val rdd = init.map(_.split(",")).map(arr => {
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
})
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}