客戶端通過構建HTable對象來與HBase集群交互。 要創建HTable對象,首先要創建一個帶有HBase集群信息的配置對象Configuration conf,其一般創建方法如下: 在擁有了conf之後,可以通過HTable提供的如下兩種構造方法來創建HTable對象: (1)直接利用conf來 ...
客戶端通過構建HTable對象來與HBase集群交互。
要創建HTable對象,首先要創建一個帶有HBase集群信息的配置對象Configuration conf,其一般創建方法如下:
Configuration conf = HBaseConfiguration.create(); //設置HBase集群的IP和埠 conf.set("hbase.zookeeper.quorum", "10.172.1.61"); conf.set("hbase.zookeeper.property.clientPort", "2181");
在擁有了conf之後,可以通過HTable提供的如下兩種構造方法來創建HTable對象:
(1)直接利用conf來創建HTable對象,對應的構造函數如下:
public HTable(Configuration conf, final TableName tableName) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; if (conf == null) { this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); this.configuration = conf; this.pool = getDefaultExecutor(conf); this.finishSetup(); }
註意紅色部分的代碼。這種構造方法實際上調用了HConnectionManager的getConnection函數,來獲取了一個HConnection對象。一般使用Java的API進行資料庫操作的時候,都會創建一個類似的對象來維護一些資料庫連接相關的信息(熟悉odbc,jdbc的話這一塊就沒有理解問題)。getConnection函數的具體實現如下:
public static HConnection getConnection(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } }
其中,CONNECTION_INSTANCES的類型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。同樣註意紅色部分的三行代碼。第一行,根據conf信息創建了一個HConnectionKey的對象;第二行,去CONNECTION_INSTANCES中查找是否存在剛纔創建的HConnectionKey;第三行,如果不存在,那麼調用createConnection來創建一個HConnection的對象,否則直接返回剛纔從Map中查找得到的HConnection對象
不嫌麻煩,再看一下HConnectionKey的構造函數和重寫的hashCode函數,代碼分別如下:
HConnectionKey(Configuration conf) { Map<String, String> m = new HashMap<String, String>(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); if (value != null) { m.put(property, value); } } } this.properties = Collections.unmodifiableMap(m); try { UserProvider provider = UserProvider.instantiate(conf); User currentUser = provider.getCurrent(); if (currentUser != null) { username = currentUser.getName(); } } catch (IOException ioe) { HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); }
}
public int hashCode() { final int prime = 31; int result = 1; if (username != null) { result = username.hashCode(); } for (String property : CONNECTION_PROPERTIES) { String value = properties.get(property); if (value != null) { result = prime * result + value.hashCode(); } } return result; }
可以看到,hashCode函數被重寫以後,其返回值實際上是username的hashCode函數的返回值,而username來自於currentuser,currentuser又來自於provider,provider是由conf創建的。可以看出,只要有相同的conf,就能創建出相同的username,也就能保證HConnectionKey的hashCode函數被重寫以後,能夠在username相同時返回相同的值。而CONNECTION_INSTANCES是一個LinkedHashMap,其get函數會調用HConnectionKey的hashCode函數來判斷該對象是否已經存在。因此,getConnection函數的本質就是根據conf信息返回connection對象,對每一個內容相同的conf,只會返回一個connection
(2)調用createConnection方法來顯式地創建connection,再使用connection來創建HTable對象。createConnection方法和Htable對應的構造函數分別如下:
public static HConnection createConnection(Configuration conf) throws IOException { UserProvider provider = UserProvider.instantiate(conf); return createConnection(conf, false, null, provider.getCurrent()); } static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); } }
public HTable(TableName tableName, HConnection connection) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = true; this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = getDefaultExecutor(this.configuration); this.finishSetup(); }
可以看出,這樣的話每次創建HTable對象,都需要創建一個新的HConnection對象,而不像方法(1)中那樣共用一個HConnection對象。
那麼,上述兩種方法,在執行插入/刪除/查找的時候,性能如何呢?先從代碼角度分析一下。為了簡便,先分析HTable在執行put(插入)操作時具體做的事情。
HTable的put函數如下:
public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { doPut(put); if (autoFlush) { flushCommits(); } } private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (ap.hasError()){ writeAsyncBuffer.add(put); backgroundFlushCommits(true); } validatePut(put); currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } } private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { do { ap.submit(writeAsyncBuffer, true); } while (synchronous && !writeAsyncBuffer.isEmpty()); if (synchronous) { ap.waitUntilDone(); } if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); while (!writeAsyncBuffer.isEmpty()) { ap.submit(writeAsyncBuffer, true); } ap.waitUntilDone(); if (!clearBufferOnFail) { // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the // write buffer. This is a questionable feature kept here for backward compatibility writeAsyncBuffer.addAll(ap.getFailedOperations()); } RetriesExhaustedWithDetailsException e = ap.getErrors(); ap.clearErrors(); throw e; } } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize(); } } } }
如紅色部分所表示,調用順序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是類AsyncProcess的對象。因此追蹤到AsynvProcess類,其代碼如下:
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { submitLowPriority(rows, atLeastOne, false); } public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException { if (rows.isEmpty()) { return; } // This looks like we are keying by region but HRegionLocation has a comparator that compares // on the server portion only (hostname + port) so this Map collects regions by server. Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); long currentTaskCnt = tasksDone.get(); boolean alreadyLooped = false; NonceGenerator ng = this.hConnection.getNonceGenerator(); do { if (alreadyLooped){ // if, for whatever reason, we looped, we want to be sure that something has changed. waitForNextTaskDone(currentTaskCnt); currentTaskCnt = tasksDone.get(); } else { alreadyLooped = true; } // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, posInList); if (loc == null) { // loc is null if there is an error such as meta not available. it.remove(); } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action<Row> action = new Action<Row>(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); addAction(loc, action, actionsByServer, ng); it.remove(); } } } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority); } private HRegionLocation findDestLocation(Row row, int posInList) { if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); HRegionLocation loc = null; IOException locationException = null; try { loc = hConnection.locateRegion(this.tableName, row.getRow()); if (loc == null) { locationException = new IOException("#" + id + ", no location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); } } catch (IOException e) { locationException = e; } if (locationException != null) { // There are multiple retries in locateRegion already. No need to add new. // We can't continue with this row, hence it's the last retry. manageError(posInList, row, false, locationException, null); return null; } return loc; }
這樣就真相大白了。HConnection在HTable的put操作中,只是起到一個定位RegionServer的作用,在這之後,操作都由RegionServer與cilent端交互。因此,只要client端不是非常頻繁地切換region,調用HConnection的次數就應當遠小於執行put操作的次數。這個結論在插入/查詢/刪除中是一致的。
代碼分析完畢,簡單做一個實驗來驗證上述論斷:
環境:四台linux 64G伺服器組成的HBase集群,連接速度平均5ms
實驗代碼如下:
public class TestHbaseConection { public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "10.172.1.16"); conf.set("hbase.zookeeper.property.clientPort", "2181"); //創建Hbase表的參數 String tableNamePrefix = "testTable"; String[] colNames = new String[2]; colNames[0] = "grad"; colNames[1] = "course"; for(int i=0;i<100;i++){ createTable(tableNamePrefix+i,colNames,conf); } for(int i=0;i<100;i++){ //通過共用connection來執行插入操作 new Thread(new WriteThread(conf,"CREATEWITHCONF",60000L,tableNamePrefix+i,colNames)).start(); //通過單獨創建connection來執行插入操作 //new Thread(new WriteThread(conf,"CREATEWITHCONN",60000L,tableNamePrefix+i,colNames)).start(); } } public static void createTable(String tableName,String[] colNames,Configuration conf) { System.out.println("start create table "+tableName); try { HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); if (hBaseAdmin.tableExists(tableName)) {// 如果存在要創建的表,那麼先刪除,再創建 hBaseAdmin.disableTable(tableName); hBaseAdmin.deleteTable(tableName); System.out.println(tableName + " is exist"); return; } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); for(int i=0;i<colNames.length;i++) { tableDescriptor.addFamily(new HColumnDescriptor(colNames[i])); } hBaseAdmin.createTable(tableDescriptor); } catch (Exception ex) { ex.printStackTrace(); } System.out.println("end create table "+tableName); } } class WriteThread implements Runnable{ private Configuration conf; private String type; private long lifeTime; private String tableName; private String[] colNames; private String threadName; public WriteThread(Configuration conf,String type,long lifeTime,String tableName,String[] colNames){ this.conf = conf; this.type = type; this.lifeTime = lifeTime; this.tableName = tableName; this.colNames = colNames; } @Override public void run(){ threadName = Thread.currentThread().getName(); int count = 0; System.out.println(threadName+": started"); try { //create connection for each thread if (type.equals("CREATEWITHCONN")) { //create htable with connection directly HConnection conn = HConnectionManager.createConnection(conf); HTable table = new HTable(TableName.valueOf(tableName),conn); HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies(); long start = System.currentTimeMillis(); long end = System.currentTimeMillis(); while(end-start<=lifeTime){ Put put = generatePut(threadName,columnFamilies,count); table.put(put); count++; end = System.currentTimeMillis(); } conn.close(); } else if (type.equals("CREATEWITHCONF")) { //create htable with conf HTable table = new HTable(conf,tableName); HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies(); long start = System.currentTimeMillis(); long end = System.currentTimeMillis(); while(end-start<=lifeTime){ Put put = generatePut(threadName,columnFamilies,count); table.put(put); count++; end = System.currentTimeMillis(); } } else { return; } }catch(Exception ex) { ex.printStackTrace(); } System.out.println(threadName+": ended with operation num:"+count); } private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,int count){ Put put = new Put(Bytes.toBytes(threadName+"_"+count)); for (int i = 0; i < columnFamilies.length; i++) { String familyName = columnFamilies[i].getNameAsString(); //System.out.println("familyName:"+familyName); for(int j=0;j<colNames.length;j++){ if(familyName.equals(colNames[j])) { // grad列族put數據 String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j)); String val = ""+columnName.hashCode()%100; put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val)); } } } return put; } }
簡單來說就是先創建100張有兩列的HBase表,然後分別採用getConnection策略和createConnection策略來寫1分鐘的數據,當然寫幾張表,寫多久,寫什麼都可以調整。
測試了幾次,使用getConnection策略時,每個線程每分鐘寫入量大概在2400~2800條左右;使用createConnection策略時,每個線程每分鐘寫入量大概在1200~1800條左右。註意此處實驗時,為了防止線程之間搶奪資源,已經令它們在不同的region上(實際上是不同的表上)進行操作了。如果在同一個region上進行操作(稍微修改實驗代碼就能做到),則性能差別更為明顯:getConnection每個線程每分鐘寫入量3500~5000,createConnection每個線程每分鐘寫入量1000~1200。總的來說,region越少,線程越多,getConnection策略越有利。猜想造成這種情況的原因是createConnection線程過多可能會導致服務端負載過大,即便是多個redionServer在負責具體的寫操作,也仍舊會導致性能下降。還有一點值得註意的是,createConnection策略需要顯式地關閉某個連接,否則它將持續地占有資源,甚至導致記憶體泄露。因此,建議大家在使用Java API與HBase交互時,儘量使用getConnection的辦法去創建HTable對象,避免浪費資源。