一、HBase過濾器簡介 Hbase 提供了種類豐富的過濾器(filter)來提高數據處理的效率,用戶可以通過內置或自定義的過濾器來對數據進行過濾,所有的過濾器都在服務端生效,即謂詞下推(predicate push down)。這樣可以保證過濾掉的數據不會被傳送到客戶端,從而減輕網路傳輸和客戶端處 ...
一、HBase過濾器簡介
Hbase 提供了種類豐富的過濾器(filter)來提高數據處理的效率,用戶可以通過內置或自定義的過濾器來對數據進行過濾,所有的過濾器都在服務端生效,即謂詞下推(predicate push down)。這樣可以保證過濾掉的數據不會被傳送到客戶端,從而減輕網路傳輸和客戶端處理的壓力。
二、過濾器基礎
2.1 Filter介面和FilterBase抽象類
Filter 介面中定義了過濾器的基本方法,FilterBase 抽象類實現了 Filter 介面。所有內置的過濾器則直接或者間接繼承自 FilterBase 抽象類。用戶只需要將定義好的過濾器通過 setFilter
方法傳遞給 Scan
或 put
的實例即可。
setFilter(Filter filter)
// Scan 中定義的 setFilter
@Override
public Scan setFilter(Filter filter) {
super.setFilter(filter);
return this;
}
// Get 中定義的 setFilter
@Override
public Get setFilter(Filter filter) {
super.setFilter(filter);
return this;
}
FilterBase 的所有子類過濾器如下:
說明:上圖基於當前時間點(2019.4)最新的 Hbase-2.1.4 ,下文所有說明均基於此版本。
2.2 過濾器分類
HBase 內置過濾器可以分為三類:分別是比較過濾器,專用過濾器和包裝過濾器。分別在下麵的三個小節中做詳細的介紹。
三、比較過濾器
所有比較過濾器均繼承自 CompareFilter
。創建一個比較過濾器需要兩個參數,分別是比較運算符和比較器實例。
public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) {
this.compareOp = compareOp;
this.comparator = comparator;
}
3.1 比較運算符
- LESS (<)
- LESS_OR_EQUAL (<=)
- EQUAL (=)
- NOT_EQUAL (!=)
- GREATER_OR_EQUAL (>=)
- GREATER (>)
- NO_OP (排除所有符合條件的值)
比較運算符均定義在枚舉類 CompareOperator
中
@InterfaceAudience.Public
public enum CompareOperator {
LESS,
LESS_OR_EQUAL,
EQUAL,
NOT_EQUAL,
GREATER_OR_EQUAL,
GREATER,
NO_OP,
}
註意:在 1.x 版本的 HBase 中,比較運算符定義在
CompareFilter.CompareOp
枚舉類中,但在 2.0 之後這個類就被標識為 @deprecated ,並會在 3.0 移除。所以 2.0 之後版本的 HBase 需要使用CompareOperator
這個枚舉類。
3.2 比較器
所有比較器均繼承自 ByteArrayComparable
抽象類,常用的有以下幾種:
- BinaryComparator : 使用
Bytes.compareTo(byte [],byte [])
按字典序比較指定的位元組數組。 - BinaryPrefixComparator : 按字典序與指定的位元組數組進行比較,但只比較到這個位元組數組的長度。
- RegexStringComparator : 使用給定的正則表達式與指定的位元組數組進行比較。僅支持
EQUAL
和NOT_EQUAL
操作。 - SubStringComparator : 測試給定的子字元串是否出現在指定的位元組數組中,比較不區分大小寫。僅支持
EQUAL
和NOT_EQUAL
操作。 - NullComparator :判斷給定的值是否為空。
- BitComparator :按位進行比較。
BinaryPrefixComparator
和 BinaryComparator
的區別不是很好理解,這裡舉例說明一下:
在進行 EQUAL
的比較時,如果比較器傳入的是 abcd
的位元組數組,但是待比較數據是 abcdefgh
:
- 如果使用的是
BinaryPrefixComparator
比較器,則比較以abcd
位元組數組的長度為準,即efgh
不會參與比較,這時候認為abcd
與abcdefgh
是滿足EQUAL
條件的; - 如果使用的是
BinaryComparator
比較器,則認為其是不相等的。
3.3 比較過濾器種類
比較過濾器共有五個(Hbase 1.x 版本和 2.x 版本相同),見下圖:
- RowFilter :基於行鍵來過濾數據;
- FamilyFilterr :基於列族來過濾數據;
- QualifierFilterr :基於列限定符(列名)來過濾數據;
- ValueFilterr :基於單元格 (cell) 的值來過濾數據;
- DependentColumnFilter :指定一個參考列來過濾其他列的過濾器,過濾的原則是基於參考列的時間戳來進行篩選 。
前四種過濾器的使用方法相同,均只要傳遞比較運算符和運算器實例即可構建,然後通過 setFilter
方法傳遞給 scan
:
Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
scan.setFilter(filter);
DependentColumnFilter
的使用稍微複雜一點,這裡單獨做下說明。
3.4 DependentColumnFilter
可以把 DependentColumnFilter
理解為一個 valueFilter 和一個時間戳過濾器的組合。DependentColumnFilter
有三個帶參構造器,這裡選擇一個參數最全的進行說明:
DependentColumnFilter(final byte [] family, final byte[] qualifier,
final boolean dropDependentColumn, final CompareOperator op,
final ByteArrayComparable valueComparator)
- family :列族
- qualifier :列限定符(列名)
- dropDependentColumn :決定參考列是否被包含在返回結果內,為 true 時表示參考列被返回,為 false 時表示被丟棄
- op :比較運算符
- valueComparator :比較器
這裡舉例進行說明:
DependentColumnFilter dependentColumnFilter = new DependentColumnFilter(
Bytes.toBytes("student"),
Bytes.toBytes("name"),
false,
CompareOperator.EQUAL,
new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
首先會去查找
student:name
中值以xiaolan
開頭的所有數據獲得參考數據集
,這一步等同於 valueFilter 過濾器;其次再用參考數據集中所有數據的時間戳去檢索其他列,獲得時間戳相同的其他列的數據作為
結果數據集
,這一步等同於時間戳過濾器;最後如果
dropDependentColumn
為 true,則返回參考數據集
+結果數據集
,若為 false,則拋棄參考數據集,只返回結果數據集
。
四、專用過濾器
專用過濾器通常直接繼承自 FilterBase
,適用於範圍更小的篩選規則。
4.1 單列列值過濾器 (SingleColumnValueFilter)
基於某列(參考列)的值決定某行數據是否被過濾。其實例有以下方法:
- setFilterIfMissing(boolean filterIfMissing) :預設值為 false,即如果該行數據不包含參考列,其依然被包含在最後的結果中;設置為 true 時,則不包含;
- setLatestVersionOnly(boolean latestVersionOnly) :預設為 true,即只檢索參考列的最新版本數據;設置為 false,則檢索所有版本數據。
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"student".getBytes(),
"name".getBytes(),
CompareOperator.EQUAL,
new SubstringComparator("xiaolan"));
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);
4.2 單列列值排除器 (SingleColumnValueExcludeFilter)
SingleColumnValueExcludeFilter
繼承自上面的 SingleColumnValueFilter
,過濾行為與其相反。
4.3 行鍵首碼過濾器 (PrefixFilter)
基於 RowKey 值決定某行數據是否被過濾。
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(prefixFilter);
4.4 列名首碼過濾器 (ColumnPrefixFilter)
基於列限定符(列名)決定某行數據是否被過濾。
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx"));
scan.setFilter(columnPrefixFilter);
4.5 分頁過濾器 (PageFilter)
可以使用這個過濾器實現對結果按行進行分頁,創建 PageFilter 實例的時候需要傳入每頁的行數。
public PageFilter(final long pageSize) {
Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize);
this.pageSize = pageSize;
}
下麵的代碼體現了客戶端實現分頁查詢的主要邏輯,這裡對其進行一下解釋說明:
客戶端進行分頁查詢,需要傳遞 startRow
(起始 RowKey),知道起始 startRow
後,就可以返回對應的 pageSize 行數據。這裡唯一的問題就是,對於第一次查詢,顯然 startRow
就是表格的第一行數據,但是之後第二次、第三次查詢我們並不知道 startRow
,只能知道上一次查詢的最後一條數據的 RowKey(簡單稱之為 lastRow
)。
我們不能將 lastRow
作為新一次查詢的 startRow
傳入,因為 scan 的查詢區間是[startRow,endRow) ,即前開後閉區間,這樣 startRow
在新的查詢也會被返回,這條數據就重覆了。
同時在不使用第三方資料庫存儲 RowKey 的情況下,我們是無法通過知道 lastRow
的下一個 RowKey 的,因為 RowKey 的設計可能是連續的也有可能是不連續的。
由於 Hbase 的 RowKey 是按照字典序進行排序的。這種情況下,就可以在 lastRow
後面加上 0
,作為 startRow
傳入,因為按照字典序的規則,某個值加上 0
後的新值,在字典序上一定是這個值的下一個值,對於 HBase 來說下一個 RowKey 在字典序上一定也是等於或者大於這個新值的。
所以最後傳入 lastRow
+0
,如果等於這個值的 RowKey 存在就從這個值開始 scan,否則從字典序的下一個 RowKey 開始 scan。
25 個字母以及數字字元,字典排序如下:
'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'
分頁查詢主要實現邏輯:
byte[] POSTFIX = new byte[] { 0x00 };
Filter filter = new PageFilter(15);
int totalRows = 0;
byte[] lastRow = null;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
if (lastRow != null) {
// 如果不是首行 則 lastRow + 0
byte[] startRow = Bytes.add(lastRow, POSTFIX);
System.out.println("start row: " +
Bytes.toStringBinary(startRow));
scan.withStartRow(startRow);
}
ResultScanner scanner = table.getScanner(scan);
int localRows = 0;
Result result;
while ((result = scanner.next()) != null) {
System.out.println(localRows++ + ": " + result);
totalRows++;
lastRow = result.getRow();
}
scanner.close();
//最後一頁,查詢結束
if (localRows == 0) break;
}
System.out.println("total rows: " + totalRows);
需要註意的是在多台 Regin Services 上執行分頁過濾的時候,由於並行執行的過濾器不能共用它們的狀態和邊界,所以有可能每個過濾器都會在完成掃描前獲取了 PageCount 行的結果,這種情況下會返回比分頁條數更多的數據,分頁過濾器就有失效的可能。
4.6 時間戳過濾器 (TimestampsFilter)
List<Long> list = new ArrayList<>();
list.add(1554975573000L);
TimestampsFilter timestampsFilter = new TimestampsFilter(list);
scan.setFilter(timestampsFilter);
4.7 首次行鍵過濾器 (FirstKeyOnlyFilter)
FirstKeyOnlyFilter
只掃描每行的第一列,掃描完第一列後就結束對當前行的掃描,並跳轉到下一行。相比於全表掃描,其性能更好,通常用於行數統計的場景,因為如果某一行存在,則行中必然至少有一列。
FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter();
scan.set(firstKeyOnlyFilter);
五、包裝過濾器
包裝過濾器就是通過包裝其他過濾器以實現某些拓展的功能。
5.1 SkipFilter過濾器
SkipFilter
包裝一個過濾器,當被包裝的過濾器遇到一個需要過濾的 KeyValue 實例時,則拓展過濾整行數據。下麵是一個使用示例:
// 定義 ValueFilter 過濾器
Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("xxx")));
// 使用 SkipFilter 進行包裝
Filter filter2 = new SkipFilter(filter1);
5.2 WhileMatchFilter過濾器
WhileMatchFilter
包裝一個過濾器,當被包裝的過濾器遇到一個需要過濾的 KeyValue 實例時,WhileMatchFilter
則結束本次掃描,返回已經掃描到的結果。下麵是其使用示例:
Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL,
new BinaryComparator(Bytes.toBytes("rowKey4")));
Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner1 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner1.close();
System.out.println("--------------------");
// 使用 WhileMatchFilter 進行包裝
Filter filter2 = new WhileMatchFilter(filter1);
scan.setFilter(filter2);
ResultScanner scanner2 = table.getScanner(scan);
for (Result result : scanner1) {
for (Cell cell : result.listCells()) {
System.out.println(cell);
}
}
scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0
rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0
rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0
rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0
rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0
--------------------
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0
rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0
rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0
rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
可以看到被包裝後,只返回了 rowKey4
之前的數據。
六、FilterList
以上都是講解單個過濾器的作用,當需要多個過濾器共同作用於一次查詢的時候,就需要使用 FilterList
。FilterList
支持通過構造器或者 addFilter
方法傳入多個過濾器。
// 構造器傳入
public FilterList(final Operator operator, final List<Filter> filters)
public FilterList(final List<Filter> filters)
public FilterList(final Filter... filters)
// 方法傳入
public void addFilter(List<Filter> filters)
public void addFilter(Filter filter)
多個過濾器組合的結果由 operator
參數定義 ,其可選參數定義在 Operator
枚舉類中。只有 MUST_PASS_ALL
和 MUST_PASS_ONE
兩個可選的值:
- MUST_PASS_ALL :相當於 AND,必須所有的過濾器都通過才認為通過;
- MUST_PASS_ONE :相當於 OR,只有要一個過濾器通過則認為通過。
@InterfaceAudience.Public
public enum Operator {
/** !AND */
MUST_PASS_ALL,
/** !OR */
MUST_PASS_ONE
}
使用示例如下:
List<Filter> filters = new ArrayList<Filter>();
Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("XXX")));
filters.add(filter1);
Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("YYY")));
filters.add(filter2);
Filter filter3 = new QualifierFilter(CompareOperator.EQUAL,
new RegexStringComparator("ZZZ"));
filters.add(filter3);
FilterList filterList = new FilterList(filters);
Scan scan = new Scan();
scan.setFilter(filterList);
參考資料
HBase: The Definitive Guide _> Chapter 4. Client API: Advanced Features
更多大數據系列文章可以參見 GitHub 開源項目: 大數據入門指南