環境:Spark-1.5.0 HBase-1.0.0。場景:HBase中按天分表存數據,要求將任意時間段的數據合併成一個RDD以做後續計算。嘗試1: 尋找一次讀取多個表的API,找到最接近的是一個叫MultiTableInputFormat的東西,它在MapReduce中使用良好, 但沒有找到用於....
環境:Spark-1.5.0 HBase-1.0.0。
場景:HBase中按天分表存數據,要求將任意時間段的數據合併成一個RDD以做後續計算。
嘗試1: 尋找一次讀取多個表的API,找到最接近的是一個叫MultiTableInputFormat的東西,它在MapReduce中使用良好,
但沒有找到用於RDD讀HBase的方法。
嘗試2: 每個表生成一個RDD,再用union合併,代碼邏輯如下:
var totalRDD = xxx // 讀取第一張表 for { // 迴圈讀表併合併到totalRDD val sRDD = xxx
totalRDD.union(sRDD) }
代碼放到集群上執行,totalRDD並不是正確的union結果,用var還真是不行。
嘗試3: 思路類似2,但使用SparkContext.union來一次合併多個RDD,代碼邏輯如下:
var rddSet: xxx = Set() // 創建RDD列表 dateSet.foreach(date => { // 將所有表的RDD放入列表中 val sRDD = xxx rddSet += sRDD } val totalRDD = sc.union(rddSet.toSeq) // 合併列表中的所有RDD
完整代碼如下:
import java.text.SimpleDateFormat import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import scala.collection.mutable.Set /** * 時間處理類 */ object Htime { /** * 根據起止日期獲取日期列表 * 例如起止時間為20160118,20160120,那麼日期列表為(20160118,20160119,20160120) * * @param sDate 開始日期 * @param eDate 結束日期 * @return 日期列表 */ def getDateSet(sDate:String, eDate:String): Set[String] = { // 定義要生成的日期列表 var dateSet: Set[String] = Set() // 定義日期格式 val sdf = new SimpleDateFormat("yyyyMMdd") // 按照上邊定義的日期格式將起止時間轉化成毫秒數 val sDate_ms = sdf.parse(sDate).getTime val eDate_ms = sdf.parse(eDate).getTime // 計算一天的毫秒數用於後續迭代 val day_ms = 24*60*60*1000 // 迴圈生成日期列表 var tm = sDate_ms while (tm <= eDate_ms) { val dateStr = sdf.format(tm) dateSet += dateStr tm = tm + day_ms } // 日期列表作為返回 dateSet } } /** * 從HBase中讀取行為數據計算人群分類 */ object Classify { /** * @param args 命令行參數,第一個參數為行為數據開始日期,第二個為結束日期,例如20160118 */ def main(args: Array[String]) { // 命令行參數個數必須為2 if (args.length != 2) { System.err.println("參數個數錯誤") System.err.println("Usage: Classify <開始日期> <結束日期>") System.exit(1) } // 獲取命令行參數中的行為數據起止日期 val startDate = args(0) val endDate = args(1) // 根據起止日誌獲取日期列表 // 例如起止時間為20160118,20160120,那麼日期列表為(20160118,20160119,20160120) val dateSet = Htime.getDateSet(startDate, endDate) // Spark上下文 val sparkConf = new SparkConf().setAppName("Classify") val sc = new SparkContext(sparkConf) // 初始化HBase配置 val conf = HBaseConfiguration.create() // 按照日期列表讀出多個RDD存在一個Set中,再用SparkContext.union()合併成一個RDD var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set() dateSet.foreach(date => { conf.set(TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // 設置表名 val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) rddSet += bRdd }) val behavRdd = sc.union(rddSet.toSeq) behavRdd.collect().foreach(println) } }