一、HBase Shell操作 1、基本操作 1)進入HBase客戶端命令行 [root@bigdata1 hbase]$ bin/hbase shell 2)查看幫助命令 hbase(main):001:0> help 3)查看當前資料庫中有哪些表 hbase(main):002:0> list ...
一、HBase Shell操作
1、基本操作
1)進入HBase客戶端命令行
[root@bigdata1 hbase]$ bin/hbase shell
2)查看幫助命令
hbase(main):001:0> help
3)查看當前資料庫中有哪些表
hbase(main):002:0> list
2、表的操作
1)創建表
hbase(main):002:0> create 'student','info'
2)插入數據到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
3)掃描查看表數據
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
4)查看表結構
hbase(main):011:0> describe 'student'
5)更新指定欄位的數據
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
6)查看“指定行”或“指定列族:列”的數據
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
7)統計表數據行數
hbase(main):021:0> count 'student'
8)刪除數據
刪除某rowkey的全部數據:
hbase(main):016:0> deleteall 'student','1001'
刪除某rowkey的某一列數據:
hbase(main):017:0> delete 'student','1002','info:sex'
9)清空表數據
hbase(main):018:0> truncate 'student'
提示:清空表的操作順序為先disable,然後再truncate。
10)刪除表
首先需要先讓該表為disable狀態:
hbase(main):019:0> disable 'student'
然後才能drop這個表:
hbase(main):020:0> drop 'student'
提示:如果直接drop表,會報錯:ERROR: Table student is enabled. Disable it first.
11)變更表信息
將info列族中的數據存放3個版本:
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
二、Flink整合HBase寫入操作
現在需要將Flink處理的數據存入HBase資料庫(namespace)shtd_result的order_info表中,rowkey為id的值,然後在Linux的HBase shell命令行中查詢列consignee,並查詢出任意5條
表空間為:shtd_result,表為order_info,列族為:info
表結構為:
欄位 | 類型 | 註釋 |
---|---|---|
rowkey | string | HBase的主鍵,值為id |
id | bigint | |
consignee | string | |
consignee_tel | string | |
final_total_amount | double | |
order_status | string | |
user_id | bigint | |
delivery_address | string | |
order_comment | string | |
out_trade_no | string | |
trade_body | string | |
create_time | string | 轉成yyyy-MM-dd hh:mm:ss格式的的字元串 |
operate_time | string | 轉成yyyy-MM-dd hh:mm:ss格式的的字元串 |
expire_time | string | 轉成yyyy-MM-dd hh:mm:ss格式的的字元串 |
tracking_no | string | |
parent_order_id | bigint | |
img_url | string | |
province_id | int | |
benefit_reduce_amount | double |
我們需要寫一個WriteToHBase類,集成自RichSinkFunction,RichSinkFunction 是一個抽象類,提供了一個更為豐富的介面,用於實現自定義的 Sink(接收器)功能。
在Scala api中RichSinkFunction的主要方法有open,invoke以及close。
-
open(Configuration parameters):
這個方法在 Sink 函數初始化時被調用,通常用於一次性的設置工作,例如打開資料庫連接或初始化狀態。
參數 parameters 提供了訪問 Flink 配置的能力。 -
invoke(value: T, context: SinkFunction.Context):
這是核心方法,用於處理每條流入的數據。
value 參數代表當前的數據元素。
context 提供了此元素的上下文信息,如當前的處理時間或事件時間。 -
close():
當 Sink 不再接收數據時調用此方法,用於執行清理工作,如關閉資料庫連接。
這個方法是在最後一次調用 invoke 方法後執行。
瞭解了這些方法後,我們來寫一下WriteToHBase
一、WriteToHBase的實現
class WriteToHBase extends RichSinkFunction[OrderData] {
@transient private var connection: Connection = _
@transient private var table: Table = _
override def open(parameters: Configuration): Unit = {
val config = HBaseConfiguration.create()
// 設置HBase配置, 如Zookeeper地址等
config.set("hbase.zookeeper.quorum", "bigdata1:2181")
connection = ConnectionFactory.createConnection(config)
table = connection.getTable(TableName.valueOf("shtd_result:order_info"))
}
override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
// 將 id 轉換為行鍵(假設 id 是唯一的)
val rowKey = Bytes.toBytes(value.id.toString)
// 為該行創建一個新的 Put 實例
val put = new Put(rowKey)
// 向 Put 實例中添加列
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee"), Bytes.toBytes(value.consignee))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee_tel"), Bytes.toBytes(value.consignee_tel))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("final_total_amount"), Bytes.toBytes(value.final_total_amount))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_status"), Bytes.toBytes(value.order_status))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(value.user_id))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delivery_address"), Bytes.toBytes(value.delivery_address))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_comment"), Bytes.toBytes(value.order_comment))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_trade_no"), Bytes.toBytes(value.out_trade_no))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("trade_body"), Bytes.toBytes(value.trade_body))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operate_time"), Bytes.toBytes(value.operate_time))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("expire_time"), Bytes.toBytes(value.expire_time))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tracking_no"), Bytes.toBytes(value.tracking_no))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parent_order_id"), Bytes.toBytes(value.parent_order_id))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("img_url"), Bytes.toBytes(value.img_url))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province_id"), Bytes.toBytes(value.province_id))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("benefit_reduce_amount"), Bytes.toBytes(value.benefit_reduce_amount))
table.put(put)
}
override def close(): Unit = {
if (table != null) {
table.close()
}
if (connection != null) {
connection.close()
}
}
}
在 Scala 和 Java 中,@transient 關鍵字用於標記一個類的成員變數為“暫時的”(transient),這意味著這個變數不會被預設的序列化過程式列化。
在 Flink中,通常用於:
-
防止序列化問題:當一個對象需要在不同的機器或上下文中傳遞時,某些屬性可能不支持序列化(例如,資料庫連接),或者序列化這些屬性沒有意義(例如,臨時緩存)。使用 @transient 可以避免這些欄位在對象序列化時引發錯誤。
-
減少網路開銷:對於不需要跨節點傳輸的欄位,使用 @transient 可以減少不必要的網路傳輸開銷。
在 WriteToHBase 類中,connection 和 table 作為 HBase 的連接和表實例,通常不支持序列化,也不應該被序列化。所以,它們被標記為 @transient。
上面的向 Put 實例中添加列過於冗長,可以用反射來代替:
def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
//獲取運行時鏡像和實例鏡像
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(data)
//獲取類成員並過濾方法
val members = typeOf[T].members.sorted.filterNot(_.isMethod)
//遍歷欄位並添加到 Put 實例
members.foreach { m =>
val fieldMirror = instanceMirror.reflectField(m.asTerm)
val name = m.name.toString.trim
val value = fieldMirror.get.toString
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
}
}
override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
val rowKey = Bytes.toBytes(value.id.toString)
val put = new Put(rowKey)
val infoCF = "info"
// 使用反射自動添加列
addColumnsUsingReflection(put, infoCF, value)
table.put(put)
}
現在來解釋一下這段代碼:
addColumnsUsingReflection
函數定義
def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
[T: TypeTag]
:類型參數T
,帶有一個上下文界定TypeTag
,這使得可以在運行時獲取類型T
的信息。(put: Put, cf: String, data: T)
:函數接受三個參數:put
是 HBase 的Put
實例,cf
是列族名,data
是要插入的數據對象。
獲取運行時鏡像和實例鏡像
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(data)
val mirror
:創建一個Mirror
實例,它是反射 API 的入口點。runtimeMirror(getClass.getClassLoader)
:獲取當前類的類載入器的運行時鏡像。val instanceMirror
:反射data
對象,得到一個可以用來訪問data
實例成員的InstanceMirror
。
獲取類成員並過濾方法
val members = typeOf[T].members.sorted.filterNot(_.isMethod)
typeOf[T]
:獲取類型T
的類型信息。.members
:獲取類型T
的所有成員(欄位和方法)。.sorted
:對成員進行排序(預設按名稱)。.filterNot(_.isMethod)
:過濾掉方法成員,只保留欄位。
遍歷欄位並添加到 Put
實例
members.foreach { m =>
val fieldMirror = instanceMirror.reflectField(m.asTerm)
val name = m.name.toString.trim
val value = fieldMirror.get.toString
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
}
- 遍歷所有欄位。
val fieldMirror
:反射每個欄位,得到一個可以操作欄位的FieldMirror
。m.asTerm
:將成員m
轉換為一個 term(欄位)。val name
:獲取欄位名,並去除首尾空格。val value
:獲取欄位的值並轉換為字元串。put.addColumn
:向Put
實例添加列,列族為cf
,列名為欄位名name
,值為欄位值value
。
invoke
方法
override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
val rowKey = Bytes.toBytes(value.id.toString)
val put = new Put(rowKey)
val infoCF = "info"
addColumnsUsingReflection(put, infoCF, value)
table.put(put)
}
override def invoke
:重寫RichSinkFunction
的invoke
方法。val rowKey
:將OrderData
的id
欄位轉換為位元組作為行鍵。val put
:創建一個新的Put
實例。val infoCF
:定義列族名。addColumnsUsingReflection
:調用之前定義的函數來動態添加列。table.put(put)
:將Put
實例寫入 HBase 表。
這段代碼通過反射自動化了向 HBase Put 實例添加數據的過程,避免了手動為每個欄位編寫重覆代碼的需要。
然後我們需要對於dataStream應用剛纔寫的 WriteToHBase 類
應用 WriteToHBase 類
dataStream.addSink(new WriteToHBase)
二、HBase Shell操作
1. 啟動 HBase Shell
首先,我們需要進入 HBase Shell。在命令行中輸入:
hbase shell
2. 創建命名空間
如果命名空間 shtd_result
還不存在,需要先創建它。在 HBase Shell 中執行以下命令:
create_namespace 'shtd_result'
3. 創建表
接著,創建表 order_info
。我們需要定義至少一個列族(在這個示例中,我將使用 info
作為列族名)。在 HBase Shell 中執行以下命令:
create 'shtd_result:order_info', 'info'
這裡,'shtd_result:order_info'
指定了完整的表名(包括命名空間),而 'info'
是列族名。
4. 驗證表創建
最後,您可以列出所有表來驗證新表是否已成功創建:
list
5. 查詢
scan 'shtd_result:order_info', {COLUMNS => ['info:consignee'], LIMIT => 5}
這裡的 scan
命令用於掃描 shtd_result:order_info
表,COLUMNS
參數指定我們只關心 info:consignee
列(假設 consignee
存儲在名為 info
的列族中),而 LIMIT => 5
指定我們只查看 5 條記錄。