Spark讀HBase多表組成一個RDD

来源:http://www.cnblogs.com/zuohongming/archive/2016/01/25/5157495.html
-Advertisement-
Play Games

環境:Spark-1.5.0 HBase-1.0.0。場景:HBase中按天分表存數據,要求將任意時間段的數據合併成一個RDD以做後續計算。嘗試1: 尋找一次讀取多個表的API,找到最接近的是一個叫MultiTableInputFormat的東西,它在MapReduce中使用良好, 但沒有找到用於....


環境:Spark-1.5.0 HBase-1.0.0。

場景:HBase中按天分表存數據,要求將任意時間段的數據合併成一個RDD以做後續計算。

嘗試1: 尋找一次讀取多個表的API,找到最接近的是一個叫MultiTableInputFormat的東西,它在MapReduce中使用良好,

  但沒有找到用於RDD讀HBase的方法。

嘗試2: 每個表生成一個RDD,再用union合併,代碼邏輯如下:

var totalRDD = xxx  // 讀取第一張表
for {  // 迴圈讀表併合併到totalRDD
  val sRDD = xxx
  totalRDD.union(sRDD) }

代碼放到集群上執行,totalRDD並不是正確的union結果,用var還真是不行。

嘗試3: 思路類似2,但使用SparkContext.union來一次合併多個RDD,代碼邏輯如下:

var rddSet: xxx = Set()  // 創建RDD列表
dateSet.foreach(date => {  // 將所有表的RDD放入列表中
    val sRDD = xxx
    rddSet += sRDD
}
val totalRDD = sc.union(rddSet.toSeq)  // 合併列表中的所有RDD

完整代碼如下:

import java.text.SimpleDateFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set

/**
  * 時間處理類
  */
object Htime {
  /**
    * 根據起止日期獲取日期列表
    * 例如起止時間為20160118,20160120,那麼日期列表為(20160118,20160119,20160120)
    *
    * @param sDate 開始日期
    * @param eDate 結束日期
    * @return 日期列表
    */
  def getDateSet(sDate:String, eDate:String): Set[String] = {
    // 定義要生成的日期列表
    var dateSet: Set[String] = Set()

    // 定義日期格式
    val sdf = new SimpleDateFormat("yyyyMMdd")

    // 按照上邊定義的日期格式將起止時間轉化成毫秒數
    val sDate_ms = sdf.parse(sDate).getTime
    val eDate_ms = sdf.parse(eDate).getTime

    // 計算一天的毫秒數用於後續迭代
    val day_ms = 24*60*60*1000

    // 迴圈生成日期列表
    var tm = sDate_ms
    while (tm <= eDate_ms) {
      val dateStr = sdf.format(tm)
      dateSet += dateStr
      tm = tm + day_ms
    }

    // 日期列表作為返回
    dateSet
  }
}

/**
  * 從HBase中讀取行為數據計算人群分類
  */
object Classify {
  /**
    * @param args 命令行參數,第一個參數為行為數據開始日期,第二個為結束日期,例如20160118
    */
  def main(args: Array[String]) {
    // 命令行參數個數必須為2
    if (args.length != 2) {
      System.err.println("參數個數錯誤")
      System.err.println("Usage: Classify <開始日期> <結束日期>")
      System.exit(1)
    }

    // 獲取命令行參數中的行為數據起止日期
    val startDate = args(0)
    val endDate   = args(1)

    // 根據起止日誌獲取日期列表
    // 例如起止時間為20160118,20160120,那麼日期列表為(20160118,20160119,20160120)
    val dateSet = Htime.getDateSet(startDate, endDate)

    // Spark上下文
    val sparkConf = new SparkConf().setAppName("Classify")
    val sc = new SparkContext(sparkConf)

    // 初始化HBase配置
    val conf = HBaseConfiguration.create()

    // 按照日期列表讀出多個RDD存在一個Set中,再用SparkContext.union()合併成一個RDD
    var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set()
    dateSet.foreach(date => {
      conf.set(TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // 設置表名
      val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])
      rddSet += bRdd
    })
    
    val behavRdd = sc.union(rddSet.toSeq)
    
    behavRdd.collect().foreach(println)
  }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用資料庫的過程中,由於斷電或其他原因,有可能導致資料庫出現一些小錯誤,比如檢索某些表特別慢,查詢不到符合條件的數據等。出現這些情況的原因,往往是因為資料庫有些損壞,或索引不完整。在ACCESS中,有個修複資料庫的功能可以解決這個問題,在SQL企業管理器,沒有這個功能,要用語句來完成,下麵就介紹如何...
  • Mysql ERROR 1064 (42000)
  • MyCAT日誌對於瞭解MyCAT的運行信息不可獲取,譬如MyCAT是否採用讀寫分離,對於一個查詢語句,MyCAT是怎樣執行的,每個分片會分發到哪個節點上等等。預設是info級別,通過log4j.xml可將其設置debug級別,這樣就可獲得更多有關MyCAT運行的內部信息。下麵通過對MyCAT的啟動以...
  • 常常需要將資料庫中的數據生成文檔,由於比較喜歡腳本的方式,所以就需要使用spool的時候進行格式設置,以下簡單整理了一下oracle中進行格式設置的一些東西,一共十八條,其實常用的也就那麼幾個,稍後會附上自己寫的簡單的shell操作的腳本,希望能供同樣有需要的共同交流,也作為自己的備份。set命令的...
  • MyCAT預設字元集是UTF8下麵通過查看日誌來驗證不同的MySQL客戶端字元集和伺服器字元集對於MyCAT的影響。日誌中與字元集有關的主要有三部分:1. 初始化MyCAT連接池2. 心跳檢測3. 在執行SQL語句時的連接同步。因為MyCAT實現的是三節點的讀寫分離和自動切換,以下修改的均是loca...
  • 用戶定義函數(UDF)分類 SQL SERVER中的用戶定義函數(User Defined Functions 簡稱UDF)分為標量函數(Scalar-Valued Function)和表值函數(Table-Valued Function)。其中表值函數又分為Inline table-valued ...
  • 1、環境準備:1 mkdir /home/mongodb #創建MongoDB程式存放目錄2 mkdir /data/mongodata -p #創建數據存放目錄3 mkdir /data/log/mongolog -p #創建日誌存放目錄2、下載:1 curl -...
  • IdNameRegisterDate1澎澎2007/1/5 00:00:002丁丁2007/1/6 04:37:003亞亞2007/1/7 00:00:00資料庫的數據如上。若以RegisterDate為查詢條件,找出'丁丁'這條記錄,則查詢語句為SELECT ID, Name, RegisterD...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...