HBase Shell操作&Flink寫入HBase

来源:https://www.cnblogs.com/toycon/archive/2023/12/23/17923523.html
-Advertisement-
Play Games

一、HBase Shell操作 1、基本操作 1)進入HBase客戶端命令行 [root@bigdata1 hbase]$ bin/hbase shell 2)查看幫助命令 hbase(main):001:0> help 3)查看當前資料庫中有哪些表 hbase(main):002:0> list ...


一、HBase Shell操作

1、基本操作

1)進入HBase客戶端命令行

    [root@bigdata1 hbase]$ bin/hbase shell

2)查看幫助命令

    hbase(main):001:0> help

3)查看當前資料庫中有哪些表

    hbase(main):002:0> list

2、表的操作

1)創建表

    hbase(main):002:0> create 'student','info'

2)插入數據到表

    hbase(main):003:0> put 'student','1001','info:sex','male'
    hbase(main):004:0> put 'student','1001','info:age','18'
    hbase(main):005:0> put 'student','1002','info:name','Janna'
    hbase(main):006:0> put 'student','1002','info:sex','female'
    hbase(main):007:0> put 'student','1002','info:age','20'

3)掃描查看表數據

    hbase(main):008:0> scan 'student'
    hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW  => '1001'}
    hbase(main):010:0> scan 'student',{STARTROW => '1001'}

4)查看表結構

    hbase(main):011:0> describe 'student'

5)更新指定欄位的數據

    hbase(main):012:0> put 'student','1001','info:name','Nick'
    hbase(main):013:0> put 'student','1001','info:age','100'

6)查看“指定行”或“指定列族:列”的數據

    hbase(main):014:0> get 'student','1001'
    hbase(main):015:0> get 'student','1001','info:name'

7)統計表數據行數

    hbase(main):021:0> count 'student'

8)刪除數據

    刪除某rowkey的全部數據:
    hbase(main):016:0> deleteall 'student','1001'
    刪除某rowkey的某一列數據:
    hbase(main):017:0> delete 'student','1002','info:sex'

9)清空表數據

    hbase(main):018:0> truncate 'student'
    提示:清空表的操作順序為先disable,然後再truncate。

10)刪除表

    首先需要先讓該表為disable狀態:
    hbase(main):019:0> disable 'student'
    然後才能drop這個表:
    hbase(main):020:0> drop 'student'
    提示:如果直接drop表,會報錯:ERROR: Table student is enabled. Disable it first.

11)變更表信息

    將info列族中的數據存放3個版本:
    hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
    hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}

二、Flink整合HBase寫入操作

現在需要將Flink處理的數據存入HBase資料庫(namespace)shtd_result的order_info表中,rowkey為id的值,然後在Linux的HBase shell命令行中查詢列consignee,並查詢出任意5條

表空間為:shtd_result,表為order_info,列族為:info
表結構為:

欄位 類型 註釋
rowkey string HBase的主鍵,值為id
id bigint
consignee string
consignee_tel string
final_total_amount double
order_status string
user_id bigint
delivery_address string
order_comment string
out_trade_no string
trade_body string
create_time string 轉成yyyy-MM-dd hh:mm:ss格式的的字元串
operate_time string 轉成yyyy-MM-dd hh:mm:ss格式的的字元串
expire_time string 轉成yyyy-MM-dd hh:mm:ss格式的的字元串
tracking_no string
parent_order_id bigint
img_url string
province_id int
benefit_reduce_amount double

我們需要寫一個WriteToHBase類,集成自RichSinkFunction,RichSinkFunction 是一個抽象類,提供了一個更為豐富的介面,用於實現自定義的 Sink(接收器)功能。

在Scala api中RichSinkFunction的主要方法有open,invoke以及close。

  • open(Configuration parameters):

    這個方法在 Sink 函數初始化時被調用,通常用於一次性的設置工作,例如打開資料庫連接或初始化狀態。
    參數 parameters 提供了訪問 Flink 配置的能力。

  • invoke(value: T, context: SinkFunction.Context):

    這是核心方法,用於處理每條流入的數據。
    value 參數代表當前的數據元素。
    context 提供了此元素的上下文信息,如當前的處理時間或事件時間。

  • close():

    當 Sink 不再接收數據時調用此方法,用於執行清理工作,如關閉資料庫連接。
    這個方法是在最後一次調用 invoke 方法後執行。

瞭解了這些方法後,我們來寫一下WriteToHBase

一、WriteToHBase的實現

  class WriteToHBase extends RichSinkFunction[OrderData] {
    @transient private var connection: Connection = _
    @transient private var table: Table = _

    override def open(parameters: Configuration): Unit = {
      val config = HBaseConfiguration.create()
      // 設置HBase配置, 如Zookeeper地址等
       config.set("hbase.zookeeper.quorum", "bigdata1:2181")

      connection = ConnectionFactory.createConnection(config)
      table = connection.getTable(TableName.valueOf("shtd_result:order_info"))
    }

    override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
      // 將 id 轉換為行鍵(假設 id 是唯一的)
      val rowKey = Bytes.toBytes(value.id.toString)

      // 為該行創建一個新的 Put 實例
      val put = new Put(rowKey)

      // 向 Put 實例中添加列
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee"), Bytes.toBytes(value.consignee))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee_tel"), Bytes.toBytes(value.consignee_tel))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("final_total_amount"), Bytes.toBytes(value.final_total_amount))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_status"), Bytes.toBytes(value.order_status))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(value.user_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delivery_address"), Bytes.toBytes(value.delivery_address))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_comment"), Bytes.toBytes(value.order_comment))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_trade_no"), Bytes.toBytes(value.out_trade_no))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("trade_body"), Bytes.toBytes(value.trade_body))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operate_time"), Bytes.toBytes(value.operate_time))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("expire_time"), Bytes.toBytes(value.expire_time))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tracking_no"), Bytes.toBytes(value.tracking_no))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parent_order_id"), Bytes.toBytes(value.parent_order_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("img_url"), Bytes.toBytes(value.img_url))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province_id"), Bytes.toBytes(value.province_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("benefit_reduce_amount"), Bytes.toBytes(value.benefit_reduce_amount))

      table.put(put)
    }

    override def close(): Unit = {
      if (table != null) {
        table.close()
      }
      if (connection != null) {
        connection.close()
      }
    }
  }

在 Scala 和 Java 中,@transient 關鍵字用於標記一個類的成員變數為“暫時的”(transient),這意味著這個變數不會被預設的序列化過程式列化。

在 Flink中,通常用於:

  • 防止序列化問題:當一個對象需要在不同的機器或上下文中傳遞時,某些屬性可能不支持序列化(例如,資料庫連接),或者序列化這些屬性沒有意義(例如,臨時緩存)。使用 @transient 可以避免這些欄位在對象序列化時引發錯誤。

  • 減少網路開銷:對於不需要跨節點傳輸的欄位,使用 @transient 可以減少不必要的網路傳輸開銷。

在 WriteToHBase 類中,connection 和 table 作為 HBase 的連接和表實例,通常不支持序列化,也不應該被序列化。所以,它們被標記為 @transient。


上面的向 Put 實例中添加列過於冗長,可以用反射來代替:

def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  //獲取運行時鏡像和實例鏡像
  val mirror = runtimeMirror(getClass.getClassLoader)
  val instanceMirror = mirror.reflect(data) 
  //獲取類成員並過濾方法
  val members = typeOf[T].members.sorted.filterNot(_.isMethod)

  //遍歷欄位並添加到 Put 實例
  members.foreach { m =>
    val fieldMirror = instanceMirror.reflectField(m.asTerm)
    val name = m.name.toString.trim
    val value = fieldMirror.get.toString

    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  }
}

override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  val rowKey = Bytes.toBytes(value.id.toString)
  val put = new Put(rowKey)
  val infoCF = "info"

  // 使用反射自動添加列
  addColumnsUsingReflection(put, infoCF, value)

  table.put(put)
}

現在來解釋一下這段代碼:

addColumnsUsingReflection 函數定義

def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  • [T: TypeTag]:類型參數 T,帶有一個上下文界定 TypeTag,這使得可以在運行時獲取類型 T 的信息。
  • (put: Put, cf: String, data: T):函數接受三個參數:put 是 HBase 的 Put 實例,cf 是列族名,data 是要插入的數據對象。

獲取運行時鏡像和實例鏡像

  val mirror = runtimeMirror(getClass.getClassLoader)
  val instanceMirror = mirror.reflect(data)
  • val mirror:創建一個 Mirror 實例,它是反射 API 的入口點。
  • runtimeMirror(getClass.getClassLoader):獲取當前類的類載入器的運行時鏡像。
  • val instanceMirror:反射 data 對象,得到一個可以用來訪問 data 實例成員的 InstanceMirror

獲取類成員並過濾方法

  val members = typeOf[T].members.sorted.filterNot(_.isMethod)
  • typeOf[T]:獲取類型 T 的類型信息。
  • .members:獲取類型 T 的所有成員(欄位和方法)。
  • .sorted:對成員進行排序(預設按名稱)。
  • .filterNot(_.isMethod):過濾掉方法成員,只保留欄位。

遍歷欄位並添加到 Put 實例

  members.foreach { m =>
    val fieldMirror = instanceMirror.reflectField(m.asTerm)
    val name = m.name.toString.trim
    val value = fieldMirror.get.toString
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  }
  • 遍歷所有欄位。
  • val fieldMirror:反射每個欄位,得到一個可以操作欄位的 FieldMirror
  • m.asTerm:將成員 m 轉換為一個 term(欄位)。
  • val name:獲取欄位名,並去除首尾空格。
  • val value:獲取欄位的值並轉換為字元串。
  • put.addColumn:向 Put 實例添加列,列族為 cf,列名為欄位名 name,值為欄位值 value

invoke 方法

override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  val rowKey = Bytes.toBytes(value.id.toString)
  val put = new Put(rowKey)
  val infoCF = "info"
  addColumnsUsingReflection(put, infoCF, value)
  table.put(put)
}
  • override def invoke:重寫 RichSinkFunctioninvoke 方法。
  • val rowKey:將 OrderDataid 欄位轉換為位元組作為行鍵。
  • val put:創建一個新的 Put 實例。
  • val infoCF:定義列族名。
  • addColumnsUsingReflection:調用之前定義的函數來動態添加列。
  • table.put(put):將 Put 實例寫入 HBase 表。

這段代碼通過反射自動化了向 HBase Put 實例添加數據的過程,避免了手動為每個欄位編寫重覆代碼的需要。

然後我們需要對於dataStream應用剛纔寫的 WriteToHBase 類

應用 WriteToHBase 類

dataStream.addSink(new WriteToHBase)

二、HBase Shell操作

1. 啟動 HBase Shell

首先,我們需要進入 HBase Shell。在命令行中輸入:

hbase shell

2. 創建命名空間

如果命名空間 shtd_result 還不存在,需要先創建它。在 HBase Shell 中執行以下命令:

create_namespace 'shtd_result'

3. 創建表

接著,創建表 order_info。我們需要定義至少一個列族(在這個示例中,我將使用 info 作為列族名)。在 HBase Shell 中執行以下命令:

create 'shtd_result:order_info', 'info'

這裡,'shtd_result:order_info' 指定了完整的表名(包括命名空間),而 'info' 是列族名。

4. 驗證表創建

最後,您可以列出所有表來驗證新表是否已成功創建:

list

5. 查詢

scan 'shtd_result:order_info', {COLUMNS => ['info:consignee'], LIMIT => 5}

這裡的 scan 命令用於掃描 shtd_result:order_info 表,COLUMNS 參數指定我們只關心 info:consignee 列(假設 consignee 存儲在名為 info 的列族中),而 LIMIT => 5 指定我們只查看 5 條記錄。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 使用過Excel的用戶都知道,Excel可以方便的對數據進行分組,過濾,排序等操作,而在WPF中,預設提供的DataGrid只有很簡單的功能,那麼如何才能讓我們開發的DataGrid,也像Excel一樣具備豐富的客戶端操作呢?今天就以一個簡單的小例子,簡述如何在WPF中實現DataGrid的過濾,篩... ...
  • 通過二次開發可以擴展新的設備型號,以插件的方式快速集成到系統。下麵幾個步驟快速實現一個簡單的電子秤驅動。 預備動作,先瞭解一下系統介紹,文章最下麵有下載鏈接。 稱重系統免費下載,支持耀華、頂尖等多款設備型號 使用插件式開發稱重儀錶驅動,RS232串口對接各類地磅秤數據實現ERP管理 1、新建一個控制 ...
  • 一:背景 1. 講故事 總會有一些朋友問一個問題,在 Windows 中線程做了上下文切換,請問被切的線程他的寄存器上下文都去了哪裡?能不能給我挖出來?這個問題其實比較底層,如果對操作系統沒有個體系層面的理解以及做過源碼分析,其實很難說明白,這篇我們就從.NET高級調試的角度試著分析一下吧。 二:寄 ...
  • 本文提供倆種不需要手動添加編輯控制項方法。 方法一:創建新的右鍵菜單添加“執行選擇”按鈕,且抑制TreeList自帶菜單結果展示: 代碼: private void Form1_Load(object sender, EventArgs e) { CreateBarButtonItem(); } pr ...
  • git教程 代碼托管平臺:git.acwing.com 1 git基本概念 工作區:倉庫的目錄。工作區是獨立於各個分支的。 暫存區:數據暫時存放的區域,類似於工作區寫入版本庫前的緩存區。暫存區是獨立於各個分支的。切換分支不會新創建暫存區。 版本庫:存放所有已經提交到本地倉庫的代碼版本 版本結構:樹結 ...
  • 版權聲明:原創作品,謝絕轉載!否則將追究法律責任。 ————— 作者:kirin Linux 定位伺服器硬碟槽位的方法 1、安裝sas3ircu工具 2、獲取磁碟SN號碼 2.1、使用smartctl命令獲取 smartctl -a /dev/sd* |grep Serial 2.2、查看文件獲取 ...
  • 面試題 海量數據里查詢某一固定首碼的key 生產上如何限制 keys * / flushdb / flushall 等危險命令以防止誤刪誤用? MEMORY USAGE 命令用過嗎? BigKey問題,多大算big?如何發現?如何刪除?如何處理? BigKey你做過調優嗎?惰性釋放lazyfree了 ...
  • 一、在本地電腦上安裝Docker 1.安裝Docker (安裝最新的Docker版本) yum install docker-ce docker-ce-cli containerd.io docker-bulidx-plugin docker-compose-plugin 2.查看Docker版本 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...