Spark連接HBase

来源:https://www.cnblogs.com/asura7969/archive/2018/02/28/8483917.html
-Advertisement-
Play Games

(一)、Spark讀取HBase中的數據 hbase中的數據 (二)、Spark寫HBase 1.第一種方式: 2.第二種方式: ...


(一)、Spark讀取HBase中的數據

hbase中的數據

 

 1 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
 2 import org.apache.hadoop.hbase.client.HBaseAdmin
 3 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 4 import org.apache.spark._
 5 import org.apache.hadoop.hbase.util.Bytes
 6 
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 從hbase讀取數據轉化成RDD
11   */
12 object SparkReadHBase {
13 
14   def main(args: Array[String]): Unit = {
15     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16     val sc = new SparkContext(sparkConf)
17 
18     val tablename = "account"
19     val conf = HBaseConfiguration.create()
20     //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程式里這樣設置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //設置zookeeper連接埠,預設2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24     conf.set(TableInputFormat.INPUT_TABLE, tablename)
25 
26     // 如果表不存在則創建表
27     val admin = new HBaseAdmin(conf)
28     if (!admin.isTableAvailable(tablename)) {
29       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30       admin.createTable(tableDesc)
31     }
32 
33     //讀取數據並轉化成rdd
34     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36       classOf[org.apache.hadoop.hbase.client.Result])
37 
38     val count = hBaseRDD.count()
39     println(count)
40     hBaseRDD.foreach{case (_,result) =>{
41       //獲取行鍵
42       val key = Bytes.toString(result.getRow)
43       //通過列族和列名獲取列
44       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46       println("Row key:"+key+" Name:"+name+" Age:"+age)
47     }}
48 
49     sc.stop()
50     admin.close()
51   }
52 
53 }

(二)、Spark寫HBase

  1.第一種方式:

 1 import org.apache.hadoop.hbase.HBaseConfiguration
 2 import org.apache.hadoop.hbase.client.Put
 3 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 4 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 5 import org.apache.hadoop.hbase.util.Bytes
 6 import org.apache.hadoop.mapred.JobConf
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
 9 /**
10   * Created by *** on 2018/2/12.
11   *
12   * 使用saveAsHadoopDataset寫入數據
13   */
14 object SparkWriteHBaseOne {
15   def main(args: Array[String]): Unit = {
16     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17     val sc = new SparkContext(sparkConf)
18 
19     val conf = HBaseConfiguration.create()
20     //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程式里這樣設置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //設置zookeeper連接埠,預設2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24 
25     val tablename = "account"
26 
27     //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的!
28     val jobConf = new JobConf(conf)
29     jobConf.setOutputFormat(classOf[TableOutputFormat])
30     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31 
32     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33 
34 
35     val rdd = indataRDD.map(_.split(',')).map{arr=>{
36       /*一個Put對象就是一行記錄,在構造方法中指定主鍵
37        * 所有插入的數據必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換
38        * Put.add方法接收三個參數:列族,列名,數據
39        */
40       val put = new Put(Bytes.toBytes(arr(0).toInt))
41       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43       //轉化成RDD[(ImmutableBytesWritable,Put)]類型才能調用saveAsHadoopDataset
44       (new ImmutableBytesWritable, put)
45     }}
46 
47     rdd.saveAsHadoopDataset(jobConf)
48 
49     sc.stop()
50   }
51 }

  2.第二種方式:

 1 import org.apache.hadoop.hbase.client.{Put, Result}
 2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 3 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 4 import org.apache.hadoop.hbase.util.Bytes
 5 import org.apache.hadoop.mapreduce.Job
 6 import org.apache.spark._
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 使用saveAsNewAPIHadoopDataset寫入數據
11   */
12 object SparkWriteHBaseTwo {
13   def main(args: Array[String]): Unit = {
14     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15     val sc = new SparkContext(sparkConf)
16 
17     val tablename = "account"
18 
19     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
22 
23     val job = new Job(sc.hadoopConfiguration)
24     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
25     job.setOutputValueClass(classOf[Result])
26     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
27 
28     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29     val rdd = indataRDD.map(_.split(',')).map{arr=>{
30       val put = new Put(Bytes.toBytes(arr(0)))
31       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33       (new ImmutableBytesWritable, put)
34     }}
35 
36     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
37   }
38 }

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 來源地址:https://baike.baidu.com/item/NoSQL/8828247?fr=aladdin NoSQL,泛指非關係型的資料庫。隨著互聯網web2.0網站的興起,傳統的關係資料庫在應付web2.0網站,特別是超大規模和高併發的SNS類型的web2.0純動態網站已經顯得力不從心 ...
  • Window XP系統上安裝Oracle Database 10G,在Mac系統上使用Navicat遠程連接oracle資料庫(不需要安裝oracle客戶端,Navicat 11 已經集成了該客戶端) ...
  • 最近同事在交接工作時,發現有幾個schedule job沒有執行成功,我這邊給看了下,其中一個是由於資料庫遷移,調用dblink的host主機IP在tnsnames中沒有變更導致,還有一個是無法視圖的報錯,即報錯信息如下: 一、錯誤日誌 通過查看schedual job報錯日誌,具體報錯信息如下 O ...
  • 現實工作中會有多個數據源同步到一個資料庫完成數據分析的場景,這些數據可以不是實時同步的,我們一般通過定時任務抽取數據到統計分析庫給應用使用。 一般的同步方式可以通過時間戳做全量和增量數據同步(存在原數據變化可能,數據不一致的情況),也可以通過dblink做數據實時查詢(較損耗線上資料庫性能),一般最 ...
  • 官方下載網站:http://www.sqltolinq.com/ 本文介紹版本為Linqer 4.5.7 第一步:下載下來,解壓,雙擊安裝.exe文件,運行界面如下。 第二步:建立與資料庫的連接 點擊左上角的New Connection,彈出對話框,填寫Name,點擊Connection Strin ...
  • 寫在前面 目前,在系統設計中引入了越來越多的NoSQL產品,例如Redis/ MongoDB/ HBase等,其中性能指標往往會成為權衡不同NoSQL產品的關鍵因素。對這些產品在性能表現和產品選擇上的爭論,Ivan碰到不止一次。雖然通過對系統架構原理方面的分析可以大致判斷出其在不同讀寫場景下的表現, ...
  • Redis數據類型: Redis支持五種數據類型:string(字元串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。 1、String(字元串) string是redis最基本的類型,你可以理解成與Memcached一模一樣的類型,一個key對應 ...
  • 1.sql語句邏輯執行順序 (7) SELECT (8) DISTINCT <select_list> (1) FROM <left_table> (3) <join_type> JOIN <right_table> (2) ON <join_condition> (4) WHERE <where_ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...