HBase學習筆記1-HConnection性能研究

来源:http://www.cnblogs.com/xczyd/archive/2016/06/12/5577124.html
-Advertisement-
Play Games

客戶端通過構建HTable對象來與HBase集群交互。 要創建HTable對象,首先要創建一個帶有HBase集群信息的配置對象Configuration conf,其一般創建方法如下: 在擁有了conf之後,可以通過HTable提供的如下兩種構造方法來創建HTable對象: (1)直接利用conf來 ...


客戶端通過構建HTable對象來與HBase集群交互。

要創建HTable對象,首先要創建一個帶有HBase集群信息的配置對象Configuration conf,其一般創建方法如下:

Configuration conf = HBaseConfiguration.create();
//設置HBase集群的IP和埠
conf.set("hbase.zookeeper.quorum", "10.172.1.61");
conf.set("hbase.zookeeper.property.clientPort", "2181");

在擁有了conf之後,可以通過HTable提供的如下兩種構造方法來創建HTable對象:

(1)直接利用conf來創建HTable對象,對應的構造函數如下:

public HTable(Configuration conf, final TableName tableName)
  throws IOException {
    this.tableName = tableName;
    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
    if (conf == null) {
      this.connection = null;
      return;
    }
    this.connection = HConnectionManager.getConnection(conf);
    this.configuration = conf;

    this.pool = getDefaultExecutor(conf);
    this.finishSetup();
 }

註意紅色部分的代碼。這種構造方法實際上調用了HConnectionManager的getConnection函數,來獲取了一個HConnection對象。一般使用Java的API進行資料庫操作的時候,都會創建一個類似的對象來維護一些資料庫連接相關的信息(熟悉odbc,jdbc的話這一塊就沒有理解問題)。getConnection函數的具體實現如下:

public static HConnection getConnection(final Configuration conf)
  throws IOException {
    HConnectionKey connectionKey = new HConnectionKey(conf);
    synchronized (CONNECTION_INSTANCES) {
      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
      if (connection == null) {
        connection = (HConnectionImplementation)createConnection(conf, true);
        CONNECTION_INSTANCES.put(connectionKey, connection);
      } else if (connection.isClosed()) {
        HConnectionManager.deleteConnection(connectionKey, true);
        connection = (HConnectionImplementation)createConnection(conf, true);
        CONNECTION_INSTANCES.put(connectionKey, connection);
      }
      connection.incCount();
      return connection;
    }
}

其中,CONNECTION_INSTANCES的類型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。同樣註意紅色部分的三行代碼。第一行,根據conf信息創建了一個HConnectionKey的對象;第二行,去CONNECTION_INSTANCES中查找是否存在剛纔創建的HConnectionKey;第三行,如果不存在,那麼調用createConnection來創建一個HConnection的對象,否則直接返回剛纔從Map中查找得到的HConnection對象

不嫌麻煩,再看一下HConnectionKey的構造函數和重寫的hashCode函數,代碼分別如下:

HConnectionKey(Configuration conf) {
    Map<String, String> m = new HashMap<String, String>();
    if (conf != null) {
      for (String property : CONNECTION_PROPERTIES) {
        String value = conf.get(property);
        if (value != null) {
          m.put(property, value);
        }
      }
    }
    this.properties = Collections.unmodifiableMap(m);

    try {
      UserProvider provider = UserProvider.instantiate(conf);
      User currentUser = provider.getCurrent();
      if (currentUser != null) {
        username = currentUser.getName();
      }
    } catch (IOException ioe) {
      HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
    }
}
public int hashCode() {
    final int prime = 31;
    int result = 1;
    if (username != null) {
      result = username.hashCode();
    }
    for (String property : CONNECTION_PROPERTIES) {
      String value = properties.get(property);
      if (value != null) {
        result = prime * result + value.hashCode();
      }
    }

    return result;
}

可以看到,hashCode函數被重寫以後,其返回值實際上是username的hashCode函數的返回值,而username來自於currentuser,currentuser又來自於provider,provider是由conf創建的。可以看出,只要有相同的conf,就能創建出相同的username,也就能保證HConnectionKey的hashCode函數被重寫以後,能夠在username相同時返回相同的值。而CONNECTION_INSTANCES是一個LinkedHashMap,其get函數會調用HConnectionKey的hashCode函數來判斷該對象是否已經存在。因此,getConnection函數的本質就是根據conf信息返回connection對象,對每一個內容相同的conf,只會返回一個connection

(2)調用createConnection方法來顯式地創建connection,再使用connection來創建HTable對象。createConnection方法和Htable對應的構造函數分別如下:

public static HConnection createConnection(Configuration conf) throws IOException {
    UserProvider provider = UserProvider.instantiate(conf);
    return createConnection(conf, false, null, provider.getCurrent());
}

static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); } }
public HTable(TableName tableName, HConnection connection) throws IOException {
    this.tableName = tableName;
    this.cleanupPoolOnClose = true;
    this.cleanupConnectionOnClose = false;
    this.connection = connection;
    this.configuration = connection.getConfiguration();

    this.pool = getDefaultExecutor(this.configuration);
    this.finishSetup();
 }

可以看出,這樣的話每次創建HTable對象,都需要創建一個新的HConnection對象,而不像方法(1)中那樣共用一個HConnection對象。

 

那麼,上述兩種方法,在執行插入/刪除/查找的時候,性能如何呢?先從代碼角度分析一下。為了簡便,先分析HTable在執行put(插入)操作時具體做的事情。

HTable的put函數如下:

public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    doPut(put);
    if (autoFlush) {
      flushCommits();
    }
}

private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    if (ap.hasError()){
      writeAsyncBuffer.add(put);
      backgroundFlushCommits(true);
    }

    validatePut(put);

    currentWriteBufferSize += put.heapSize();
    writeAsyncBuffer.add(put);

    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }
}

private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    try {
      do {
        ap.submit(writeAsyncBuffer, true);
      } while (synchronous && !writeAsyncBuffer.isEmpty());

      if (synchronous) {
        ap.waitUntilDone();
      }

      if (ap.hasError()) {
        LOG.debug(tableName + ": One or more of the operations have failed -" +
            " waiting for all operation in progress to finish (successfully or not)");
        while (!writeAsyncBuffer.isEmpty()) {
          ap.submit(writeAsyncBuffer, true);
        }
        ap.waitUntilDone();

        if (!clearBufferOnFail) {
          // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
          //  write buffer. This is a questionable feature kept here for backward compatibility
          writeAsyncBuffer.addAll(ap.getFailedOperations());
        }
        RetriesExhaustedWithDetailsException e = ap.getErrors();
        ap.clearErrors();
        throw e;
      }
    } finally {
      currentWriteBufferSize = 0;
      for (Row mut : writeAsyncBuffer) {
        if (mut instanceof Mutation) {
          currentWriteBufferSize += ((Mutation) mut).heapSize();
        }
      }
    }
}

如紅色部分所表示,調用順序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是類AsyncProcess的對象。因此追蹤到AsynvProcess類,其代碼如下:

public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
    submitLowPriority(rows, atLeastOne, false);
}

public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException {
    if (rows.isEmpty()) {
      return;
    }

    // This looks like we are keying by region but HRegionLocation has a comparator that compares
    // on the server portion only (hostname + port) so this Map collects regions by server.
    Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>();
    List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());

    long currentTaskCnt = tasksDone.get();
    boolean alreadyLooped = false;

    NonceGenerator ng = this.hConnection.getNonceGenerator();
    do {
      if (alreadyLooped){
        // if, for whatever reason, we looped, we want to be sure that something has changed.
        waitForNextTaskDone(currentTaskCnt);
        currentTaskCnt = tasksDone.get();
      } else {
        alreadyLooped = true;
      }

      // Wait until there is at least one slot for a new task.
      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);

      // Remember the previous decisions about regions or region servers we put in the
      //  final multi.
      Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
      Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        HRegionLocation loc = findDestLocation(r, posInList);

        if (loc == null) { // loc is null if there is an error such as meta not available.
          it.remove();
        } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
          Action<Row> action = new Action<Row>(r, ++posInList);
          setNonce(ng, r, action);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer, ng);
          it.remove();
        }
      }
    } while (retainedActions.isEmpty() && atLeastOne && !hasError());

    HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority);
}

private HRegionLocation findDestLocation(Row row, int posInList) {
  if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
  HRegionLocation loc = null;
  IOException locationException = null;
  try {
    loc = hConnection.locateRegion(this.tableName, row.getRow());
    if (loc == null) {
      locationException = new IOException("#" + id + ", no location found, aborting submit for" +
          " tableName=" + tableName +
          " rowkey=" + Arrays.toString(row.getRow()));
    }
  } catch (IOException e) {
    locationException = e;
  }
  if (locationException != null) {
    // There are multiple retries in locateRegion already. No need to add new.
    // We can't continue with this row, hence it's the last retry.
    manageError(posInList, row, false, locationException, null);
    return null;
  }

  return loc;
}

這樣就真相大白了。HConnection在HTable的put操作中,只是起到一個定位RegionServer的作用,在這之後,操作都由RegionServer與cilent端交互。因此,只要client端不是非常頻繁地切換region,調用HConnection的次數就應當遠小於執行put操作的次數。這個結論在插入/查詢/刪除中是一致的。

代碼分析完畢,簡單做一個實驗來驗證上述論斷:

環境:四台linux 64G伺服器組成的HBase集群,連接速度平均5ms

實驗代碼如下:

public class TestHbaseConection {

    public static void main(String[] args) throws Exception{

        Configuration conf = HBaseConfiguration.create();

        conf.set("hbase.zookeeper.quorum", "10.172.1.16");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
      
        //創建Hbase表的參數
        String tableNamePrefix = "testTable";
        String[] colNames = new String[2];
        colNames[0] = "grad";
        colNames[1] = "course";

        for(int i=0;i<100;i++){
            createTable(tableNamePrefix+i,colNames,conf);
        }

        
        for(int i=0;i<100;i++){
            //通過共用connection來執行插入操作
            new Thread(new WriteThread(conf,"CREATEWITHCONF",60000L,tableNamePrefix+i,colNames)).start();
            //通過單獨創建connection來執行插入操作
            //new Thread(new WriteThread(conf,"CREATEWITHCONN",60000L,tableNamePrefix+i,colNames)).start();
        }
    }

    public static void createTable(String tableName,String[] colNames,Configuration conf) {
        System.out.println("start create table "+tableName);
        try {
            HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
            if (hBaseAdmin.tableExists(tableName)) {// 如果存在要創建的表,那麼先刪除,再創建
          hBaseAdmin.disableTable(tableName);
          hBaseAdmin.deleteTable(tableName);
          System.out.println(tableName + " is exist");
          return;
            }
            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
            for(int i=0;i<colNames.length;i++) {
                tableDescriptor.addFamily(new HColumnDescriptor(colNames[i]));
            }
            hBaseAdmin.createTable(tableDescriptor);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        System.out.println("end create table "+tableName);
    }

}

class WriteThread implements Runnable{

    private Configuration conf;
    private String type;
    private long lifeTime;
    private String tableName;
    private String[] colNames;

    private String threadName;

    public WriteThread(Configuration conf,String type,long lifeTime,String tableName,String[] colNames){
        this.conf = conf;
        this.type = type;
        this.lifeTime = lifeTime;
        this.tableName = tableName;
        this.colNames = colNames;
    }

    @Override
    public void run(){

        threadName = Thread.currentThread().getName();
        int count = 0;

        System.out.println(threadName+": started");

        try {
            //create connection for each thread
            if (type.equals("CREATEWITHCONN")) {
                //create htable with connection directly
                HConnection conn = HConnectionManager.createConnection(conf);
                HTable table = new HTable(TableName.valueOf(tableName),conn);
                HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();

                long start = System.currentTimeMillis();
                long end = System.currentTimeMillis();

                while(end-start<=lifeTime){
                    Put put = generatePut(threadName,columnFamilies,count);
                    table.put(put);
                    count++;
                    end = System.currentTimeMillis();
                }

                conn.close();
            }
            else if (type.equals("CREATEWITHCONF")) {
                //create htable with conf
                HTable table = new HTable(conf,tableName);
                HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();

                long start = System.currentTimeMillis();
                long end = System.currentTimeMillis();

                while(end-start<=lifeTime){
                    Put put = generatePut(threadName,columnFamilies,count);
                    table.put(put);
                    count++;
                    end = System.currentTimeMillis();
                }
            }
            else {
                return;
            }
        }catch(Exception ex) {
            ex.printStackTrace();
        }
        System.out.println(threadName+": ended with operation num:"+count);
    }

    private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,int count){
        Put put = new Put(Bytes.toBytes(threadName+"_"+count));
        for (int i = 0; i < columnFamilies.length; i++) {
            String familyName = columnFamilies[i].getNameAsString();
            //System.out.println("familyName:"+familyName);
            for(int j=0;j<colNames.length;j++){
                if(familyName.equals(colNames[j])) { // grad列族put數據
                    String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j));
                    String val = ""+columnName.hashCode()%100;
                    put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val));
                }
            }
        }
        return put;
    }
}                

簡單來說就是先創建100張有兩列的HBase表,然後分別採用getConnection策略和createConnection策略來寫1分鐘的數據,當然寫幾張表,寫多久,寫什麼都可以調整。

測試了幾次,使用getConnection策略時,每個線程每分鐘寫入量大概在2400~2800條左右;使用createConnection策略時,每個線程每分鐘寫入量大概在1200~1800條左右。註意此處實驗時,為了防止線程之間搶奪資源,已經令它們在不同的region上(實際上是不同的表上)進行操作了。如果在同一個region上進行操作(稍微修改實驗代碼就能做到),則性能差別更為明顯:getConnection每個線程每分鐘寫入量3500~5000,createConnection每個線程每分鐘寫入量1000~1200。總的來說,region越少,線程越多,getConnection策略越有利。猜想造成這種情況的原因是createConnection線程過多可能會導致服務端負載過大,即便是多個redionServer在負責具體的寫操作,也仍舊會導致性能下降。還有一點值得註意的是,createConnection策略需要顯式地關閉某個連接,否則它將持續地占有資源,甚至導致記憶體泄露。因此,建議大家在使用Java API與HBase交互時,儘量使用getConnection的辦法去創建HTable對象,避免浪費資源。

 

 

  

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.下拉框 select : 移除option $("#ID option").each(function(){ if($(this).val() == 111){ $(this).remove(); } }); 添加option $("<option value='111'>UPS Ground< ...
  • 在js裡面,對於函數的調用,實際上也是也是面向對象的思路,於是寫好js函數,也是考核面向對象設計的能力,同時也必須考慮到如何實現高內聚和低耦合,拿一個例子來說,現在的需求是這樣的,實現個投資進度框,就是如圖所示:總共分四步來走,第一步“創建訂單中”,成功改變提示信息“創建訂單成功!”,顯示,不成功改 ...
  • Sencha ExtJS 6 在UI上非常強大,這裡介紹一個widget Grid示例來說明如何來在表格中顯示精度條和迷你圖... ...
  • CSS 有兩個說不上常用的偽類 :before 和 :after,偶爾會被人用來添加些自定義格式什麼的,但是它們的功用不僅於此。前幾天發現了 Creative Link Effects 這個非常有意思的介紹創意鏈接特效的頁面,裡面驚人的效果大量使用到的特性除了 transform 屬性進行變形之外, ...
  • 在使用UeEditor中遇到幾個個坑 1.添加的html代碼中使用的樣式class被guolv掉 解決方案:在ueditor.config.js中,xss過濾白名單中,每個元素添加class,如下圖 2.伺服器端許可權問題 解決方案:在controller.ashx(我用的.net版本)中添加相關的權 ...
  • 今天寫問卷的時候遇到個label點擊的時候,監聽的click事件被執行兩次;產生這個的原因麽。。。事件冒泡 然後麽找了下方法。。。 方法一:把label扔了。。。 然後方法二 只認input,判斷事件源為input(這是網上有人貼出來的解決方法)http://www.cnblogs.com/feng ...
  • border radius 圓角是做網頁永遠繞不過的話題,以前基本是通過背景圖片做的,有了 CSS3 以後通過屬性就 能夠搞定,我們可以通過 border radius 設置元素的圓角半徑。 對於一個正方形,我們只需要設置為邊長的一半就可以呈現一個圓。 "代碼" border radius 是縮寫的 ...
  • JS自動類型轉換 var a = 1; var b = true; "==" 表示 可以自動類型轉換,比較的是數值 " " 表示可以自動類型轉換,先比較數值,再比較類型 if (a == b) { alert("相等"); //列印 }else{ alert("不等"); } 三目運算 var c ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...