1.簡介 HBase是一個基於HDFS的、分散式的、面向列的非關係型資料庫。 HBase的特點 1.海量數據存儲,HBase表中的數據能夠容納上百億行*上百萬列。 2.面向列的存儲,數據在表中是按照列進行存儲的,能夠動態的增加列並對列進行各種操作。 3.準實時查詢,HBase在海量的數據量下能夠接近 ...
1.簡介
HBase是一個基於HDFS的、分散式的、面向列的非關係型資料庫。
HBase的特點
1.海量數據存儲,HBase表中的數據能夠容納上百億行*上百萬列。
2.面向列的存儲,數據在表中是按照列進行存儲的,能夠動態的增加列並對列進行各種操作。
3.準實時查詢,HBase在海量的數據量下能夠接近準實時的查詢(百毫秒以內)
4.多版本,HBase中每一列的數據都可以有多個版本。
5.可靠性,HBase中的數據存儲於HDFS中且依賴於Zookeeper進行Master和RegionServer的協調管理。
HBase與關係型資料庫的區別
1.HBase中的數據類型只有String,而關係型資料庫中有char、varchar、int等。
2.HBase中只有普通的增刪改查操作,沒有表與表之間的連接、子查詢等,若想要在HBase中進行複雜的操作則應該使用Phoenix。
3.HBase是基於列進行存儲的,因此在查詢指定列的數據時效率會很高,而關係型資料庫是基於行存儲,每次查詢都要查詢整行。
4.HBase適合海量數據存儲,而關係型資料庫一般一張表不超過500M,否則就要考慮分表操作。
5.HBase中為空的列不占用存儲空間,表的設計可以非常稀疏,而關係型資料庫中表的設計較謹密。
2.HBase的表結構
*HBase中的表由RowKey、ColumnFamily、Column、Timestamp組成。
RowKey
記錄的唯一標識,相當於關係型資料庫中的主鍵。
*RowKey最大長度為64KB且按字典順序進行排序存儲。
ColumnFamily
列簇相當於特定的一個類別,每個列簇下可以有任意數量個列,並且列是動態進行添加的,只在插入數據後存在,HBase在創建表時只需要指定表名和列簇即可。
*一個列簇下的成員有著相同的首碼,使用冒號來對列簇和列名進行分隔。
*一張表中的列簇最好不超過5個。
Column
列只有在插入數據後才存在,且列在列簇中是有序的。
*每個列簇下的列數沒有限制。
Timestamp
HBase中的每個鍵值對都有一個時間戳,在進行插入時由HBase進行自動賦值。
3.HBase的物理模型
Master
1.處理對錶的添加、刪除、查詢等操作。
2.進行RegionServer的負載均衡(Region與RegionServer的分配)
3.在RegionServer宕機後負責RegionServer上的Region轉移(通過WAL日誌)
*Master失效僅會導致meta數據和表無法被修改,表中的數據仍然可以進行讀取和寫入。
RegionServer
1.處理對錶中數據的添加、刪除、修改、查詢等操作。
2.維護Region並將Region中StoreFile寫入到HDFS中。
3.當Region中的數據達到一定大小時進行Region的切分。
Region
1.表中的數據存儲在Region中,每個Region都由RegionServer進行管理。
2.每個Region都包含MemoryStore和StoreFile,MemoryStore中的數據位於記憶體,每當MemoryStore中的數據達到128M時將會生成一個StoreFile並寫入到HDFS中。
3.Region中每個列簇對應一個MemoryStore,可以有多個StoreFile,當多個StoreFile的文件大小超過一定時,會進行StoreFile的合併,將多個StoreFile文件合併成一個StoreFile,當StoreFile中的大小超過一定閥值時,會進行Region的切分,由Master將新Region分配到相應的RegionServer中,實現負載均衡。
Zookeeper在HBase中的作用
1.保證Master的高可用性,當狀態為Active的Master無法提供服務時,會立刻將狀態為StandBy的Master切換為Active狀態。
2.實時監控RegionServer集群,當某個RegionServer節點無法提供服務時將會通知Master,由Master進行RegionServer上的Region轉移以及重新進行負載均衡。
3.當HBase集群啟動後,Master和RegionServer會分別向Zookeeper進行註冊,會在Zookeeper中存放HBase的meta表數據,Region與RegionServer的關係、以及RegionServer的訪問地址等信息。
*meta表中維護著TableName、RowKey和Region的關聯關係。
HBase處理讀取和寫入請求的流程
HBase處理讀取請求的過程
1.客戶端連接Zookeeper,根據TableName和RowKey從Meta表中計算出該Row對應的Region。
2.獲取該Region所關聯的RegionServer,並獲取RegionServer的訪問地址。
3.訪問RegionServer,找到對應的Region。
4.如果Region的MemoryStore中有該Row則直接進行獲取,否則從StoreFile中進行查詢。
HBase處理寫入請求的過程
1.客戶端連接Zookeeper,根據TableName找到其Region列表。
2.通過一定演算法計算出要寫入的Region。
3.獲取該Region所關聯的RegionServer併進行連接。
4.把數據分別寫到HLog和MemoryStore中。
5.每當MemoryStore中的大小達到128M時,會生成一個StoreFile。
6.當多個StoreFile的文件大小達到一定時,會進行StoreFile的合併,將多個StoreFile文件合併成一個StoreFile,當StoreFile的文件大小超過一定閾值時,會進行Region的切分,由Master將新Region分配到相應的RegionServer中,實現負載均衡。
*在第一次讀取或寫入時才需要連接Zookeeper,會將Zookeeper中的相關數據緩存到本地,往後直接從本地進行讀取,當Zookeeper中的信息發生變化時,再通過通知機制通知客戶端進行更新。
HBase在HDFS中的目錄
1.tmp目錄:當對HBase的表進行創建和刪除時,會將表移動到該目錄中進行操作。
2.MasterProcWALs目錄:預寫日誌目錄,主要用於存儲Master的操作日誌。
3.WALs目錄:預寫日誌目錄,主要用於存儲RegionServer的操作日誌。
4.data目錄:存儲Region中的StoreFile。
5.hbase.id文件:HBase集群的唯一標識。
6.hbase.version文件:HBase集群的版本號。
7.oldWALs目錄:當WALs目錄下的日誌文件超過一定時間後,會將其移動到oldWALs目錄中,Master會定期進行清理。
4.HBase集群的搭建
1.安裝JDK和Hadoop
由於HBase是通過JAVA語言編寫的,且HBase是基於HDFS的,因此需要安裝JDK和Hadoop,並配置好JAVA_HOME環境變數。
由於HDFS一般都以集群的方式運行,因此需要搭建HDFS集群。
*在搭建HDFS集群時,需要相互配置SSH使之互相信任並且開放防火牆相應的埠,或者直接關閉防火牆。
2.安裝Zookeeper併進行集群的搭建
由於HDFS HA依賴於Zookeeper,且HBase也依賴於Zookeeper,因此需要安裝Zookeeper併進行集群的搭建。
3.安裝HBase
1.從CDH中下載HBase併進行解壓:http://archive.cloudera.com/cdh5/cdh/5/
2.修改hbase-env.sh配置文件
#設置JDK的安裝目錄
export JAVA_HOME=/usr/jdk8/jdk1.8.0_161
#true則使用hbase自帶的zk服務,false則使用外部的zk服務.
export HBASE_MANAGES_ZK=flase
3.修改hbase-site.xml配置文件
<!-- 指定HBase日誌的存放目錄 -->
<property>
<name>hbase.tmp.dir</name>
<value>/usr/hbase/hbase-1.2.8/logs</value>
</property>
<!-- 指定HBase中的數據存儲在HDFS中的目錄 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://nameservice:8020/hbase</value>
</property>
<!-- 設置是否是分散式 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 指定HBase使用的ZK地址 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value>
</property>
4.修改regionservers文件,配置充當RegionServer的節點
*值可以是主機名或者IP地址
*如果Hadoop配置了HDFS HA高可用集群,那麼就會有兩個NameNode和一個NameService,此時就需要將HDFS的core-site.xml和hdfs-site.xml配置文件複製到HBase的conf目錄下,且hbase-site.xml配置文件中的hbase.rootdir配置項的HDFS地址指向NameService的名稱。
5.NTP時間同步
NTP是一個時間伺服器,作用是使集群中的各個節點的時間都保持一致。
由於在HBase集群中,Zookeeper與HBase對時間的要求較高,如果兩個節點之間的時間相差過大時,那麼整個集群就會崩潰,因此需要使各個節點的時間都保持一致。
#查看是否安裝了NTP服務
rpm -qa|grep ntp
#安裝NTP服務
yum install ntp -y
#從NTP伺服器中獲取時間並同步本地
ntpdate 192.168.1.80
*在實際的應用場景中,可以自己搭建NTP伺服器,也可以使用第三方開源的NTP伺服器,如阿裡等。
使用 “ntpdate NTP伺服器地址” 命令從NTP伺服器中獲取時間並同步本地,一般配合Linux的crontab使用,每隔5分鐘進行一次時間的同步。
4.啟動集群
使用bin目錄下的start-hbase.sh命令啟動集群,那麼會在當前節點中啟動一個Master和RegionSever進程,並通過SSH訪問其它節點,啟動RegionServer進程。
由於HBase的Master HA集群是通過Zookeeper進行協調的,需要手動在其他節點中啟動Master,Zookeeper能保證當前HBase集群中有且只有一個Master處於Active狀態,當狀態為Active的Master無法正常提供服務時,會將處於StandBy的Master的狀態修改為Active。
*當HBase集群啟動後,可以訪問http:/localhost:16030,進入HBase的Web監控頁面。
5.使用Shell操作HBase
使用bin/hbase shell命令進行HBase的Shell操作
#創建表
create 'tableName' , 'columnFamily' , 'columnFamily...'
#添加記錄
put 'tableName' , 'rowkey' , 'columnFamily:column' , 'value'
#查詢記錄
get 'tableName' , 'rowkey'
#統計表的記錄數
count 'tableName'
#刪除記錄
deleteall 'tableName' , 'rowkey'
#刪除記錄的某一列
delete 'tableName' , 'rowkey' ,'columnFamily:column'
#禁用表
disable 'tableName'
#啟動表
enable 'tableName'
#查看表是否被禁用
is_disabled 'tableName'
#刪除表
drop 'tableName'
#查看表中的所有記錄
scan 'tableName'
#查看表中指定列的所有記錄
scan 'tableName' , {COLUMNS=>'columnFamily:column'}
#檢查表是否存在
exists 'tableName'
#查看當前HBase中的表
list
*在刪除表時需要禁用表,否則無法刪除。
*使用put相同rowkey的一條數據來進行記錄的更新,僅會更新列相同的值。
6.使用JAVA操作HBase
1.導入相關依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.8</version>
</dependency>
2.初始化配置
使用HBaseConfiguration的create()靜態方法創建一個Configuration實例,用於封裝環境配置信息。
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum","192.168.1.80,192.168.1.81,192.168.1.82");
config.set("hbase.zookeeper.property.clientPort","2181");
*此方法會預設載入classpath下的hbase-site.xml配置文件,如果沒有此配置文件則需要手動進行環境的配置。
3.創建HBase連接對象
Connection conn = ConnectionFactory.createConnection(config);
4.進行表的管理
*使用Admin類進行HBase表的管理,通過Connection實例的getAdmin()靜態方法返回一個Admin實例。
//判斷表是否存在
boolean tableExists(TableName);
//遍歷HBase中的表定義
HTableDescriptor [] listTables();
//遍歷HBase中的表名稱
TableName [] listTableNames();
//根據表名獲取表定義
HTableDescriptor getTableDescriptor(TableName);
//創建表
void createTable(HTableDescriptor);
//刪除表
void deleteTable(TableName);
//啟用表
void enableTable(TableName);
//禁用表
void disableTable(TableName);
//判斷表是否是啟用狀態
boolean isTableEnabled(TableName);
//判斷表是否是禁用狀態
boolean isTableDisabled(TableName);
//為表添加列簇
void addColumn(TableName,HColumnDescriptor);
//刪除表中的列簇
void deleteColumn(TableName,byte);
//修改表中的列簇
void modifyColumn(TableName,HColumnDescriptor);
TableName實例用於封裝表名稱。
HTableDescriptor實例用於封裝表定義,包括表的名稱、表的列簇等。
HColumnDescriptor實例用於封裝表的列簇。
5.對錶中的數據進行增刪改查
使用Table類進行表數據的增刪改查,通過Connection的getTable(TableName)靜態方法返回一個Table實例。
//判斷指定RowKey的數據是否存在
boolean exists(Get get);
//根據RowKey獲取數據
Result get(Get get);
//根據多個RowKey獲取數據
Result [] get(List<Get>);
//獲取表的掃描器
ResultScanner getScanner(Scan);
//添加數據
void put(Put);
//批量添加數據
void put(List<Put>);
//刪除數據
void delete(Delete);
//批量刪除數據
void delete(List<Delete>)
使用Get實例封裝查詢參數,使用其構建方法設置RowKey。
使用Put實例封裝新增和更新參數,使用其構建方法設置RowKey,使用其addColumn(byte[] family , byte[] qualifier , byte[] value)方法分別指定列簇、列名、列值。
使用Delete實例封裝刪除參數,使用其構建方法設置RowKey。
使用Scan實例封裝掃描器的查詢條件,使用其addFamily(byte[] family)方法設置掃描的列簇,使用其addColumn(byte[] family , byte[] qualifier)方法分別指定要掃描的列簇和列名。
*在進行表的增刪改查時,方法參數大多都是位元組數組類型,可以使用HBase Java提供的Bytes工具類進行字元串和位元組數組之間的轉換。
*在進行查詢操作時,會返回Result實例,Result實例包含了一個RowKey的所有鍵值對(cell,不區分列簇),可以通過Result實例的listCells()方法獲取其包含的所有cell,藉助CellUtil工具類獲取Cell實例中對應的RowKey、Family、Qualifier、Value等屬性信息。
*在使用getScanner掃描時,返回的ResultScanner介面繼承Iterable介面,其泛型是Result,因此可以理解成ResultScanner是Result的一個集合。
6.完整的HBaseUtil
/** * @Auther: ZHUANGHAOTANG * @Date: 2018/11/26 11:40 * @Description: */ public class HBaseUtils { private static final Logger logger = LoggerFactory.getLogger(HBaseUtils.class); /** * ZK集群地址 */ private static final String ZK_CLUSTER_HOSTS = "192.168.1.80,192.168.1.81,192.168.1.82"; /** * ZK埠 */ private static final String ZK_CLUSTER_PORT = "2181"; /** * HBase全局連接 */ private static Connection connection; static { //預設載入classpath下hbase-site.xml文件 Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", ZK_CLUSTER_HOSTS); configuration.set("hbase.zookeeper.property.clientPort", ZK_CLUSTER_PORT); try { connection = ConnectionFactory.createConnection(configuration); } catch (Exception e) { logger.info("初始化HBase連接失敗:", e); } } /** * 返回連接 */ public static Connection getConnection() { return connection; } /** * 創建表 */ public static void createTable(String tableName, String... families) throws Exception { Admin admin = connection.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { throw new UnsupportedOperationException("tableName " + tableName + " is already exists"); } HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); for (String family : families) descriptor.addFamily(new HColumnDescriptor(family)); admin.createTable(descriptor); } /** * 刪除表 */ public static void deleteTable(String tableName) throws Exception { Admin admin = connection.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); } } /** * 獲取所有表名稱 */ public static TableName[] getTableNameList() throws Exception { Admin admin = connection.getAdmin(); return admin.listTableNames(); } /** * 獲取所有表定義 */ public static HTableDescriptor[] getTableDescriptorList() throws Exception { Admin admin = connection.getAdmin(); return admin.listTables(); } /** * 為表添加列簇 */ public static void addFamily(String tableName, String family) throws Exception { Admin admin = connection.getAdmin(); if (!admin.tableExists(TableName.valueOf(tableName))) { throw new UnsupportedOperationException("tableName " + tableName + " is not exists"); } admin.addColumn(TableName.valueOf(tableName), new HColumnDescriptor(family)); } /** * 刪除表中指定的列簇 */ public static void deleteFamily(String tableName, String family) throws Exception { Admin admin = connection.getAdmin(); admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(family)); } /** * 為表添加一條數據 */ public static void put(String tableName, String rowKey, String family, Map<String, String> values) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); for (Map.Entry<String, String> entry : values.entrySet()) put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue())); table.put(put); } /** * 批量為表添加數據 */ public static void batchPut(String tableName, String family, Map<String, Map<String, String>> values) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); List<Put> puts = new ArrayList<>(); for (Map.Entry<String, Map<String, String>> entry : values.entrySet()) { Put put = new Put(Bytes.toBytes(entry.getKey())); for (Map.Entry<String, String> subEntry : entry.getValue().entrySet()) put.addColumn(Bytes.toBytes(family), Bytes.toBytes(subEntry.getKey()), Bytes.toBytes(subEntry.getValue())); puts.add(put); } table.put(puts); } /** * 刪除RowKey中的某列 */ public static void deleteColumn(String tableName, String rowKey, String family, String qualifier) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); table.delete(delete); } /** * 刪除RowKey */ public static void delete(String tableName, String rowKey) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); table.delete(new Delete(Bytes.toBytes(rowKey))); } /** * 批量刪除RowKey */ public static void batchDelete(String tableName, String... rowKeys) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); List<Delete> deletes = new ArrayList<>(); for (String rowKey : rowKeys) deletes.add(new Delete(Bytes.toBytes(rowKey))); table.delete(deletes); } /** * 根據RowKey獲取數據 */ public static Map<String, String> get(String tableName, String rowKey) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Result result = table.get(new Get(Bytes.toBytes(rowKey))); List<Cell> cells = result.listCells(); Map<String, String> cellsMap = new HashMap<>(); for (Cell cell : cells) { cellsMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } return cellsMap; } /** * 獲取全表數據 */ public static Map<String, Map<String, String>> scan(String tableName) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = table.getScanner(new Scan()); return getResult(resultScanner); } /** * 獲取某列數據 */ public static Map<String, Map<String, String>> scan(String tableName, String family, String qualifier) throws Exception { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); ResultScanner resultScanner = table.getScanner(scan); return getResult(resultScanner); } private static Map<String, Map<String, String>> getResult(ResultScanner resultScanner) { Map<String, Map<String, String>> resultMap = new HashMap<>(); for (Result result : resultScanner) { List<Cell> cells = result.listCells(); Map<String, String> cellsMap = new HashMap<>(); for (Cell cell : cells) cellsMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); resultMap.put(Bytes.toString(result.getRow()), cellsMap); } return resultMap; } }