Hbase提供了豐富的Java API,以及線程池操作,下麵我用線程池來展示一下使用Java API操作Hbase。 項目結構如下: ...
Hbase提供了豐富的Java API,以及線程池操作,下麵我用線程池來展示一下使用Java API操作Hbase。
項目結構如下:
我使用的Hbase的版本是
hbase-0.98.9-hadoop2-bin.tar.gz
大家下載後,可以拿到裡面的lib目錄下麵的jar文件,即上所示的hbase-lib資源。
介面類:
/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java
1 package com.b510.hbase.util.dao; 2 3 import java.util.List; 4 5 import org.apache.hadoop.hbase.client.HTableInterface; 6 7 8 /** 9 * @author Hongten 10 * @created 7 Nov 2018 11 */ 12 public interface HbaseDao { 13 14 // initial table 15 public HTableInterface getHTableFromPool(String tableName); 16 17 // check if the table is exist 18 public boolean isHTableExist(String tableName); 19 20 // create table 21 public void createHTable(String tableName, String[] columnFamilys); 22 23 // insert new row 24 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value); 25 26 // get row by row key 27 public void getRow(String tableName, String rowKey); 28 29 public void getAllRows(String tableName); 30 31 // get rows by giving range 32 public void getRowsByRange(String tableName, String startRowKey, String endRowKey); 33 34 //delete row 35 public void delRow(String tableName, String rowKey); 36 37 //delete rows by row keys 38 public void delRowsByRowKeys(String tableName, List<String> rowKeys); 39 40 // auto flush data when close 41 public void closeAutoFlush(HTableInterface table); 42 43 // close table 44 public void closeTable(HTableInterface table); 45 46 // close pool connection 47 public void closePoolConnection(); 48 49 // delete table 50 public void deleteHTable(String tableName); 51 }
實現類:
/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java
1 package com.b510.hbase.util.dao.impl; 2 3 import java.io.IOException; 4 import java.util.List; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.Cell; 8 import org.apache.hadoop.hbase.CellUtil; 9 import org.apache.hadoop.hbase.HBaseConfiguration; 10 import org.apache.hadoop.hbase.HColumnDescriptor; 11 import org.apache.hadoop.hbase.HTableDescriptor; 12 import org.apache.hadoop.hbase.MasterNotRunningException; 13 import org.apache.hadoop.hbase.TableName; 14 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 15 import org.apache.hadoop.hbase.client.Delete; 16 import org.apache.hadoop.hbase.client.Get; 17 import org.apache.hadoop.hbase.client.HBaseAdmin; 18 import org.apache.hadoop.hbase.client.HTableInterface; 19 import org.apache.hadoop.hbase.client.HTablePool; 20 import org.apache.hadoop.hbase.client.Put; 21 import org.apache.hadoop.hbase.client.Result; 22 import org.apache.hadoop.hbase.client.ResultScanner; 23 import org.apache.hadoop.hbase.client.Scan; 24 25 import com.b510.hbase.util.dao.HbaseDao; 26 27 /** 28 * @author Hongten 29 * @created 7 Nov 2018 30 */ 31 @SuppressWarnings("deprecation") 32 public class HbaseDaoImpl implements HbaseDao { 33 34 private static Configuration conf = null; 35 private static HBaseAdmin hAdmin; 36 private static HTablePool pool; 37 38 private static int defaultPoolSize = 5; 39 40 public HbaseDaoImpl(int poolSize) { 41 conf = HBaseConfiguration.create(); 42 conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888"); 43 try { 44 hAdmin = new HBaseAdmin(conf); 45 // the default pool size is 5. 46 pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize); 47 } catch (MasterNotRunningException e) { 48 e.printStackTrace(); 49 } catch (ZooKeeperConnectionException e) { 50 e.printStackTrace(); 51 } catch (IOException e) { 52 e.printStackTrace(); 53 } 54 } 55 56 @Override 57 public HTableInterface getHTableFromPool(String tableName) { 58 HTableInterface table = pool.getTable(tableName); 59 return table; 60 } 61 62 @Override 63 public boolean isHTableExist(String tableName) { 64 try { 65 return hAdmin.tableExists(tableName); 66 } catch (IOException e) { 67 e.printStackTrace(); 68 } 69 return false; 70 } 71 72 @Override 73 public void createHTable(String tableName, String[] columnFamilys) { 74 if (!isHTableExist(tableName)) { 75 HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); 76 // The Hbase suggested the number of column family should be less than 3. 77 // Normally, there only have 1 column family. 78 for (String cfName : columnFamilys) { 79 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName); 80 tableDescriptor.addFamily(hColumnDescriptor); 81 } 82 try { 83 hAdmin.createTable(tableDescriptor); 84 } catch (IOException e) { 85 e.printStackTrace(); 86 } 87 System.out.println("The table [" + tableName + "] is created."); 88 } else { 89 System.out.println("The table [" + tableName + "] is existing already."); 90 } 91 92 } 93 94 @Override 95 public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) { 96 if (isHTableExist(tableName)) { 97 HTableInterface table = getHTableFromPool(tableName); 98 Put put = new Put(rowKey.getBytes()); 99 put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes()); 100 try { 101 table.put(put); 102 } catch (IOException e) { 103 e.printStackTrace(); 104 } 105 System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "]."); 106 closeTable(table); 107 } else { 108 System.out.println("The table [" + tableName + "] does not exist."); 109 } 110 } 111 112 @Override 113 public void getRow(String tableName, String rowKey) { 114 if (isHTableExist(tableName)) { 115 HTableInterface table = getHTableFromPool(tableName); 116 Get get = new Get(rowKey.getBytes()); 117 Result result; 118 try { 119 result = table.get(get); 120 String columnName = ""; 121 String timeStamp = ""; 122 String columnFamily = ""; 123 String value = ""; 124 for (Cell cell : result.rawCells()) { 125 timeStamp = String.valueOf(cell.getTimestamp()); 126 columnFamily = new String(CellUtil.cloneFamily(cell)); 127 columnName = new String(CellUtil.cloneQualifier(cell)); 128 value = new String(CellUtil.cloneValue(cell)); 129 130 System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "]."); 131 } 132 } catch (IOException e) { 133 e.printStackTrace(); 134 } 135 closeTable(table); 136 } else { 137 System.out.println("The table [" + tableName + "] does not exist."); 138 } 139 } 140 141 @Override 142 public void getAllRows(String tableName) { 143 if (isHTableExist(tableName)) { 144 Scan scan = new Scan(); 145 scanHTable(tableName, scan); 146 } else { 147 System.out.println("The table [" + tableName + "] does not exist."); 148 } 149 } 150 151 private void scanHTable(String tableName, Scan scan) { 152 try { 153 HTableInterface table = getHTableFromPool(tableName); 154 ResultScanner results = table.getScanner(scan); 155 for (Result result : results) { 156 String rowKey = ""; 157 String columnName = ""; 158 String timeStamp = ""; 159 String columnFamily = ""; 160 String value = ""; 161 for (Cell cell : result.rawCells()) { 162 rowKey = new String(CellUtil.cloneRow(cell)); 163 timeStamp = String.valueOf(cell.getTimestamp()); 164 columnFamily = new String(CellUtil.cloneFamily(cell)); 165 columnName = new String(CellUtil.cloneQualifier(cell)); 166 value = new String(CellUtil.cloneValue(cell)); 167 168 System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "]."); 169 } 170 } 171 closeTable(table); 172 } catch (IOException e) { 173 e.printStackTrace(); 174 } 175 } 176 177 @Override 178 public void getRowsByRange(String tableName, String startRowKey, String endRowKey) { 179 if (isHTableExist(tableName)) { 180 Scan scan = new Scan(); 181 scan.setStartRow(startRowKey.getBytes()); 182 // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive). 183 // the hbase version is 0.98.9 184 scan.setStopRow(endRowKey.getBytes()); 185 scanHTable(tableName, scan); 186 } else { 187 System.out.println("The table [" + tableName + "] does not exist."); 188 } 189 } 190 191 @Override 192 public void delRow(String tableName, String rowKey) { 193 if (isHTableExist(tableName)) { 194 HTableInterface table = getHTableFromPool(tableName); 195 deleteRow(table, rowKey); 196 } else { 197 System.out.println("The table [" + tableName + "] does not exist."); 198 } 199 } 200 201 private void deleteRow(HTableInterface table, String rowKey) { 202 Delete del = new Delete(rowKey.getBytes()); 203 try { 204 table.delete(del); 205 System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "]."); 206 closeTable(table); 207 } catch (IOException e) { 208 e.printStackTrace(); 209 } 210 } 211 212 @Override 213 public void delRowsByRowKeys(String tableName, List<String> rowKeys) { 214 if (rowKeys != null && rowKeys.size() > 0) { 215 for (String rowKey : rowKeys) { 216 delRow(tableName, rowKey); 217 } 218 } 219 } 220 221 @Override 222 public void deleteHTable(String tableName) { 223 if (isHTableExist(tableName)) { 224 try { 225 hAdmin.disableTable(tableName.getBytes()); 226 hAdmin.deleteTable(tableName.getBytes()); 227 System.out.println("The table [" + tableName + "] is deleted."); 228 } catch (IOException e) { 229 e.printStackTrace(); 230 } 231 } else { 232 System.out.println("The table [" + tableName + "] does not exist."); 233 } 234 235 } 236 237 @Override 238 public void closeAutoFlush(HTableInterface table) { 239 table.setAutoFlush(false, false); 240 } 241 242 @Override 243 public void closeTable(HTableInterface table) { 244 try { 245 table.close(); 246 } catch (IOException e) { 247 e.printStackTrace(); 248 } 249 } 250 251 @Override 252 public void closePoolConnection() { 253 try { 254 pool.close(); 255 } catch (IOException e) { 256 e.printStackTrace(); 257 } 258 } 259 260 }
測試類:
/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java
1 package com.b510.hbase.util.dao.test; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import org.junit.Test; 7 8 import com.b510.hbase.util.dao.HbaseDao; 9 import com.b510.hbase.util.dao.impl.HbaseDaoImpl; 10 11 /** 12 * @author Hongten 13 * @created 7 Nov 2018 14 */ 15 public class HbaseDaoTest { 16 17 HbaseDao dao = new HbaseDaoImpl(4); 18 19 public static final String tableName = "t_test"; 20 public static final String columnFamilyName = "cf1"; 21 public static final String[] CFs = { columnFamilyName }; 22 23 public static final String COLUMN_NAME_NAME = "name"; 24 public static final String COLUMN_NAME_AGE = "age"; 25 26 @Test 27 public void main() { 28 createTable(); 29 addRow(); 30 getRow(); 31 getAllRows(); 32 getRowsByRange(); 33 delRow(); 34 delRowsByRowKeys(); 35 deleteHTable(); 36 } 37 38 public void createTable() { 39 System.out.println("=== create table ===="); 40 dao.createHTable(tableName, CFs); 41 } 42 43 public void addRow() { 44 System.out.println("=== insert record ===="); 45 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten"); 46 dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22"); 47 48 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom"); 49 dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25"); 50 51 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone"); 52 dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30"); 53 54 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs"); 55 dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24"); 56 } 57 58 public void getRow() { 59 System.out.println("=== get record ===="); 60 dao.getRow(tableName, "12345566"); 61 } 62 63 public void getAllRows() { 64 System.out.println("=== scan table ===="); 65 dao.getAllRows(tableName); 66 } 67 68 public void getRowsByRange() { 69 System.out.println("=== scan record by giving range ===="); 70 // it will return the '12345567' and '12345568' rows. 71 dao.getRowsByRange(tableName, "12345567", "12345569"); 72 } 73 74 public void delRow() { 75 System.out.println("=== delete record ===="); 76 dao.delRow(tableName, "12345568"); 77 // only '12345567' row. 78 getRowsByRange(); 79 } 80 81 public void delRowsByRowKeys() { 82 System.out.println("=== delete batch records ===="); 83 List<String> rowKeys = new ArrayList<String>(); 84 rowKeys.add("12345566"); 85 rowKeys.add("12345569"); 86 dao.delRowsByRowKeys(tableName, rowKeys); 87 // can not find the '12345566' and '12345569' 88 getAllRows(); 89 } 90 91 public void deleteHTable() { 92 System.out.println("=== delete table ===="); 93 dao.deleteHTable(tableName); 94 } 95 }
測試結果:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. === create table ==== The table [t_test] is created. === insert record ==== Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten]. Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24]. === get record ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. === scan table ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. === delete record ==== Delete from table [t_test], Rowkey=[12345568]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete batch records ==== Delete from table [t_test], Rowkey=[12345566]. Delete from table [t_test], Rowkey=[12345569]. === scan table ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete table ==== The table [t_test] is deleted.
源碼下載:
========================================================
More reading,and english is important.
I'm Hongten
大哥哥大姐姐,覺得有用打賞點哦!你的支持是我最大的動力。謝謝。
Hongten博客排名在100名以內。粉絲過千。
Hongten出品,必是精品。
E | [email protected] B | http://www.cnblogs.com/hongten
========================================================