測試環境:Idea+Windows10 準備工作: <1>、打開本地 C:\Windows\System32\drivers\etc(系統預設)下名為hosts的系統文件,如果提示當前用戶沒有許可權打開文件;第一種方法是將hosts文件拖到桌面進行配置後再拖回原處;第二種一勞永逸的方法是修改當前用戶對 ...
測試環境:Idea+Windows10
準備工作:
<1>、打開本地 C:\Windows\System32\drivers\etc(系統預設)下名為hosts的系統文件,如果提示當前用戶沒有許可權打開文件;第一種方法是將hosts文件拖到桌面進行配置後再拖回原處;第二種一勞永逸的方法是修改當前用戶對該文件的許可權為完全控制;
<2>、打開後hosts文件後,添加HBase集群伺服器的用戶名及IP地址如下:
<3>、由於是windows系統下遠程連接HBase,而HBase底層依賴Hadoop,所以需要下載hadoop二進位包存放到本地目錄將來會在程式中引用該目錄,否則會報錯。你也可以理解為windows下需要模擬linux環境才能正常連接HBasehadoop;(註:windows下的版本需要和linux下一致,這裡我僅僅提供的2.6.0hadoop版本解析包)
程式代碼:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring_hbase</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring_hbase</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--HBase依賴-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop-core</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.2.1</version>
<type>pom</type>
</dependency>
<!--HBase依賴-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
HBaseUtils.class:
package com.example.spring_hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* HBase工具類
* Author JiaPeng_lv
*/
public class HBaseUtils {
private static Connection connection;
private static Configuration configuration;
private static HBaseUtils hBaseUtils;
private static Properties properties;
/**
* 創建連接池並初始化環境配置
*/
public void init(){
properties = System.getProperties();
//實例化HBase配置類
if (configuration==null){
configuration = HBaseConfiguration.create();
}
try {
//載入本地hadoop二進位包
properties.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.6.0-bin-master");
//zookeeper集群的URL配置信息
configuration.set("hbase.zookeeper.quorum","k1,k2,k3,k4,k5");
//HBase的Master
configuration.set("hbase.master","hba:60000");
//客戶端連接zookeeper埠
configuration.set("hbase.zookeeper.property.clientPort","2181");
//HBase RPC請求超時時間,預設60s(60000)
configuration.setInt("hbase.rpc.timeout",20000);
//客戶端重試最大次數,預設35
configuration.setInt("hbase.client.retries.number",10);
//客戶端發起一次操作數據請求直至得到響應之間的總超時時間,可能包含多個RPC請求,預設為2min
configuration.setInt("hbase.client.operation.timeout",30000);
//客戶端發起一次scan操作的rpc調用至得到響應之間的總超時時間
configuration.setInt("hbase.client.scanner.timeout.period",200000);
//獲取hbase連接對象
if (connection==null||connection.isClosed()){
connection = ConnectionFactory.createConnection(configuration);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 關閉連接池
*/
public static void close(){
try {
if (connection!=null)connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 私有無參構造方法
*/
private HBaseUtils(){}
/**
* 唯一實例,線程安全,保證連接池唯一
* @return
*/
public static HBaseUtils getInstance(){
if (hBaseUtils == null){
synchronized (HBaseUtils.class){
if (hBaseUtils == null){
hBaseUtils = new HBaseUtils();
hBaseUtils.init();
}
}
}
return hBaseUtils;
}
/**
* 獲取單條數據
* @param tablename
* @param row
* @return
* @throws IOException
*/
public static Result getRow(String tablename, byte[] row) throws IOException{
Table table = null;
Result result = null;
try {
table = connection.getTable(TableName.valueOf(tablename));
Get get = new Get(row);
result = table.get(get);
}finally {
table.close();
}
return result;
}
/**
* 查詢多行信息
* @param tablename
* @param rows
* @return
* @throws IOException
*/
public static Result[] getRows(String tablename,List<byte[]> rows) throws IOException{
Table table = null;
List<Get> gets = null;
Result[] results = null;
try {
table = connection.getTable(TableName.valueOf(tablename));
gets = new ArrayList<Get>();
for (byte[] row : rows){
if(row!=null){
gets.add(new Get(row));
}
}
if (gets.size() > 0) {
results = table.get(gets);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
table.close();
}
return results;
}
/**
* 獲取整表數據
* @param tablename
* @return
*/
public static ResultScanner get(String tablename) throws IOException{
Table table = null;
ResultScanner results = null;
try {
table = connection.getTable(TableName.valueOf(tablename));
Scan scan = new Scan();
scan.setCaching(1000);
results = table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}finally {
table.close();
}
return results;
}
/**
* 單行插入數據
* @param tablename
* @param rowkey
* @param family
* @param cloumns
* @throws IOException
*/
public static void put(String tablename, String rowkey, String family, Map<String,String> cloumns) throws IOException{
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tablename));
Put put = new Put(rowkey.getBytes());
for (Map.Entry<String,String> entry : cloumns.entrySet()){
put.addColumn(family.getBytes(),entry.getKey().getBytes(),entry.getValue().getBytes());
}
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}finally {
table.close();
close();
}
}
}
①、保證該工具類唯一實例
②、全局共用重量級類Connection,該類為線程安全,使用完畢後關閉連接池
③、每次執行內部CRUD方法會創建唯一對象Table,該類為非線程安全,使用完畢後關閉
由於時間原因,內部功能方法及測試較少,有其他需求的可以自行百度添加更多方法,這裡主要以類結構及配置為主。
Test.class:
package com.example.spring_hbase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringHbaseApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void test01(){
HBaseUtils.getInstance();
try {
Long time = System.currentTimeMillis();
Result result = HBaseUtils.getRow("GPS_MAP", Bytes.toBytes(1));
System.out.println("本次查詢耗時:"+(System.currentTimeMillis()-time)*1.0/1000+"s");
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> navigableMap = result.getMap();
for (byte[] family:navigableMap.keySet()){
System.out.println("columnFamily:"+ new String(family));
for (byte[] column : navigableMap.get(family).keySet()){
System.out.println("column:"+new String(column));
for (Long t : navigableMap.get(family).get(column).keySet()){
System.out.println("value:"+new String(navigableMap.get(family).get(column).get(t)));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
HBaseUtils.close();
}
}
@Test
public void test02(){
HBaseUtils.getInstance();
ResultScanner results = null;
try {
Long time = System.currentTimeMillis();
results = HBaseUtils.get("GPS_MAP");
System.out.println("本次查詢耗時:"+(System.currentTimeMillis()-time)*1.0/1000+"s");
for (Result result : results){
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> navigableMap = result.getMap();
for (byte[] family:navigableMap.keySet()){
System.out.println("columnFamily:"+ new String(family));
for (byte[] column : navigableMap.get(family).keySet()){
System.out.println("column:"+new String(column));
for (Long t : navigableMap.get(family).get(column).keySet()){
System.out.println("value:"+new String(navigableMap.get(family).get(column).get(t)));
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
results.close();
HBaseUtils.close();
}
}
@Test
public void test03(){
HBaseUtils.getInstance();
Result[] results = null;
List<byte[]> list = null;
try {
list = new ArrayList<byte[]>();
list.add(Bytes.toBytes(1));
list.add(Bytes.toBytes(2));
Long time = System.currentTimeMillis();
results = HBaseUtils.getRows("GPS_MAP",list);
System.out.println("本次查詢耗時:"+(System.currentTimeMillis()-time)*1.0/1000+"s");
for (Result result : results){
NavigableMap<byte[],NavigableMap<byte[],NavigableMap<Long,byte[]>>> navigableMap = result.getMap();
for (byte[] family:navigableMap.keySet()){
System.out.println("columnFamily:"+ new String(family));
for (byte[] column : navigableMap.get(family).keySet()){
System.out.println("column:"+new String(column));
for (Long t : navigableMap.get(family).get(column).keySet()){
System.out.println("value:"+new String(navigableMap.get(family).get(column).get(t)));
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
HBaseUtils.close();
}
}
@Test
public void test04(){
HBaseUtils.getInstance();
try {
Map<String,String> cloumns = new HashMap<String, String>();
cloumns.put("test01","test01");
cloumns.put("test02","test02");
Long time = System.currentTimeMillis();
HBaseUtils.put("GPS_MAP","3","TEST",cloumns);
System.out.println("本次插入耗時:"+(System.currentTimeMillis()-time)*1.0/1000+"s");
} catch (IOException e) {
e.printStackTrace();
}finally {
HBaseUtils.close();
}
}
}
測試後發現查詢和插入效率相對於沒有優化過的類耗時大大縮減;