## B+樹相關介紹 > B+樹是一棵**多叉排序樹**,即每個非葉子節點可以包含多個子節點,其整體結構呈扁平化,所以其非常適配於資料庫和操作系統的文件系統中。且B+樹能夠保持數據的穩定有序,插入和刪除都擁有較穩定的**對數時間複雜度**。 **B+樹的特性**:以 m 階為例,m 表示內部節點即非 ...
B+樹相關介紹
B+樹是一棵多叉排序樹,即每個非葉子節點可以包含多個子節點,其整體結構呈扁平化,所以其非常適配於資料庫和操作系統的文件系統中。且B+樹能夠保持數據的穩定有序,插入和刪除都擁有較穩定的對數時間複雜度。
B+樹的特性:以 m 階為例,m 表示內部節點即非葉子節點可以包含的最大子節點個數 maximumNum
- 若一個內部節點有 \(n(n <= m)\) 個子節點,則該內部節點應包含 \(n - 1\) 個關鍵字,也就是索引值
- 除根節點和葉子節點外,其他節點至少包含
Math.ceil(m / 2)
個子節點,這是因為B+樹的生成順序導致的- 最開始,B+樹只有一個根節點和若幹不超過m個的葉子節點;
- 逐漸添加,導致葉子節點超過m時,此時根節點的子節點個數大於m,不符合要求,需要分裂
- 分裂則導致增加2個內部節點,其中一個內部節點個數為
(m+1)/2
,另一個為(m+2)/2
- 其他內部節點也是如此規律形成,所以所有內部節點的子節點個數均大於
Math.ceil(m / 2)
- 內部節點即對應索引部分,節點中僅包含子樹中最大/最小的索引值
- 葉子節點即對應數據部分,節點中不僅包含索引值,也包含其他的值信息
- 最底層所有葉子節點通過雙向鏈表串聯,優化範圍查詢
B+樹實現
目前實現的B+樹的簡易版,葉子節點是存儲的Entry<Integer, String>
鍵值對,內部節點存儲的是Integer
索引,後續有時間再進行泛型的通用擴展。
節點定義
抽象公共父類 Node
package bplustree;
public abstract class Node {
InternalNode parent; // 父節點
public Node() {}
public abstract boolean isValid(); // 判斷刪除節點後各B+樹節點是否滿足要求
public abstract boolean isAvailable(); // 判斷B+樹節點是否可以分裂節點給其他節點
public abstract boolean isMergeable(); // 判斷B+樹節點是否可以和其他節點合併
}
內部節點定義
public class InternalNode extends Node{
int maxChildNodes; // 子節點個數最大值 m,m為階數
int minChildNodes; // 除根節點及葉子節點外,子節點個數最小值 ceil(m / 2)
int curNodesNum; // 內部節點當前的子節點個數
InternalNode leftSibling; // 左兄弟節點
InternalNode rightSibling; // 右兄弟節點
Integer[] keys; // 內部節點當前的索引值,最多有 m - 1 個
Node[] childPointers; // 內部節點當前的子節點,最多有 m 個
}
葉子節點定義
public class LeafNode extends Node {
int maximumNum; // 葉子節點最多元素個數 m - 1
int minimumNum;
int curNum; // 葉子節點當前的元素個數
LeafNode leftSibling; // 左兄弟和右兄弟形成雙向鏈表
LeafNode rightSibling;
Entry[] entries; // 葉子節點鍵值對,不僅存儲索引值,也存儲其他值信息
}
class Entry implements Comparable<Entry> {
Integer key;
String value;
public Entry(Integer key, String value) {
this.key = key;
this.value = value;
}
@Override
public int compareTo(Entry o) {
return key.compareTo(o.key);
}
}
B+樹定義
public class BPlusTree {
int m;
InternalNode root; // B+樹根節點
LeafNode head; // 葉子節點的首元素
}
查詢操作
單值查詢
B+樹的查找過程:根據查找的索引值k,從根節點向葉子節點搜索,對數時間複雜度。
public String search(int key) {
if (isEmpty()) {
return null;
}
// 樹查找
LeafNode leafNode = (this.root == null) ? this.head : findLeafNode(key);
Entry[] entries = leafNode.entries;
// 葉子節點內進行二分查找
int index = binarySearch(entries, leafNode.curNum, key, null);
if (index == -1) {
return null;
}
else {
return entries[index]
}
}
// 從根節點開始查找
private LeafNode findLeafNode(Integer key) {
return findLeafNode(root, key);
}
// 找到索引值所在的葉子節點
private LeafNode findLeafNode(InternalNode internalNode, Integer key) {
Integer[] keys = internalNode.keys;
int i;
for (i = 0; i < internalNode.curNodesNum - 1; i++) {
if (key.compareTo(keys[i]) < 0) {
break;
}
}
Object child = internalNode.childPointers[i];
if (child instanceof LeafNode) {
return (LeafNode) child;
}
else {
return findLeafNode((InternalNode) child, key);
}
}
區間查詢
B+樹區間查詢左值可能在的葉子節點位置,然後通過雙向鏈表向後遍歷。
// 閉區間 [left, right]
public ArrayList<String> searchRange(int left, int right) {
List<String> values = new ArrayList<>();
LeafNode leafNode = findLeafNode(left); // 查找左值可能存在的位置,並從該位置向後遍歷
boolean flag = true;
while (leafNode != null && flag) {
Entry[] entries = leafNode.entries;
for (Entry entry : entries) {
if (entry == null) {
break;
}
if ( entry.key > right) {
flag = false;
break;
}
if (left <= entry.key && right >= entry.key) {
values.add(entry.value);
}
}
leafNode = leafNode.rightSibling;
}
return values;
}
插入操作
B+樹的插入操作僅在葉子節點進行:
- 若為空樹,則創建一個葉子節點,該葉子節點同時也是根節點,插入操作結束;
- 根據插入的 key 值,找到應該在的葉子節點插入;
- 若插入後葉子節點個數符合要求即小於m,則插入結束
- 若插入後葉子節點個數不符合要求即大於等於m,將該節點分裂成兩半,則判斷當前葉子節點是否為根節點
- 若當前葉子節點為根節點,則構建一個新的root節點,指向分裂後的兩個子節點
- 若當前葉子節點不為根節點,則在父節點處添加一個新的子節點,新子節點則存儲原節點一半的值,並迴圈向上判斷中間節點是否滿足要求
public void insert(int key, String value) {
if (isEmpty()) {
this.head = new LeafNode(this.m, new Entry(key, value));
}
else {
LeafNode leafNode = (this.root == null) ? this.head : findLeafNode(key);
// 插入葉子節點失敗,即葉子節點中存儲已到達上限
if (!leafNode.insert(new Entry(key, value))) {
leafNode.entries[leafNode.curNum++] = new Entry(key, value);
sortEntries(leafNode.entries);
// 葉子節點分裂的位置
int mid = getIndexOfMidPointer();
Entry[] halfEntry = splitEntries(leafNode, mid);
// 若葉子節點為根節點,即parent為null
if (leafNode.parent == null) {
Integer[] parent_keys = new Integer[m];
parent_keys[0] = halfEntry[0].key;
// 創建新的 root
InternalNode parent = new InternalNode(m, parent_keys);
leafNode.parent = parent;
parent.appendChildPointer(leafNode);
}
// 若葉子節點不為根節點
else {
int newParentKey = halfEntry[0].key;
leafNode.parent.keys[leafNode.parent.curNodesNum - 1] = newParentKey;
Arrays.sort(leafNode.parent.keys, 0, leafNode.parent.curNodesNum);
}
// 分裂後的另一半葉子節點,添加到父節點
LeafNode newLeafNode = new LeafNode(this.m, halfEntry, leafNode.parent);
// 分裂後的另一半葉子節點對應的下標
int index = leafNode.parent.getIndexOfPointer(leafNode) + 1;
for (int i = index; i < leafNode.parent.childPointers.length - 1; i++) {
leafNode.parent.childPointers[i + 1] = leafNode.parent.childPointers[i];
}
leafNode.parent.childPointers[index] = newLeafNode;
// 關聯兄弟節點
newLeafNode.rightSibling = leafNode.rightSibling;
if (newLeafNode.rightSibling != null) {
newLeafNode.rightSibling.leftSibling = newLeafNode;
}
leafNode.rightSibling = newLeafNode;
newLeafNode.leftSibling = leafNode;
if (this.root == null) {
this.root = leafNode.parent;
}
else {
// 逐漸上浮,判斷插入是否會導致B+樹內部節點不符合要求
InternalNode internalNode = leafNode.parent;
while (internalNode != null) {
if (internalNode.isOverfull()) {
splitInternalNode(internalNode);
}
else {
break;
}
internalNode = internalNode.parent;
}
}
}
}
}
/**
* 葉子節點插入,導致的上層內部節點分裂
*/
private void splitInternalNode(InternalNode internalNode) {
InternalNode parent = internalNode.parent;
int mid = getIndexOfMidPointer();
Integer newParentKey = internalNode.keys[mid];
// 內部節點的 node 分裂
Node[] halfPointers = splitChildPointers(internalNode, mid);
// 內部節點的 key 分裂
Integer[] halfKeys = splitKeys(internalNode.keys, mid);
// 分裂後內部節點的子節點個數
internalNode.curNodesNum = linearNullSearch(internalNode.childPointers);
InternalNode sibling = new InternalNode(this.m, halfKeys, halfPointers);
for (Node pointer : halfPointers) {
if (pointer != null) {
pointer.parent = sibling;
}
}
sibling.rightSibling = internalNode.rightSibling;
internalNode.rightSibling = sibling;
sibling.leftSibling = internalNode;
if (sibling.rightSibling != null) {
sibling.rightSibling.leftSibling = sibling;
}
// root node
if (parent == null) {
Integer[] keys = new Integer[this.m];
keys[0] = newParentKey;
InternalNode newRoot = new InternalNode(this.m, keys);
newRoot.appendChildPointer(internalNode);
newRoot.appendChildPointer(sibling);
this.root = newRoot;
internalNode.parent = newRoot;
sibling.parent = newRoot;
}
else {
parent.keys[parent.curNodesNum - 1] = newParentKey;
Arrays.sort(parent.keys, 0, parent.curNodesNum);
int index = parent.getIndexOfPointer(internalNode) + 1;
parent.insertChildPointer(sibling, index);
sibling.parent = parent;
}
}
private Node[] splitChildPointers(InternalNode node, int split) {
Node[] pointers = node.childPointers;
Node[] newPointers = new Node[this.m + 1];
for (int i = split + 1; i < pointers.length; i++) {
newPointers[i - split - 1] = pointers[i];
node.removePointer(i);
}
return newPointers;
}
private Integer[] splitKeys(Integer[] keys, int split) {
Integer[] newKeys = new Integer[m];
keys[split] = null;
for (int i = split + 1; i < keys.length; i++) {
newKeys[i - split] = keys[i];
keys[i] = null;
}
return newKeys;
}
刪除操作
B+樹的刪除操作僅在葉子節點進行:
- 若刪除後,葉子節點中的索引個數仍然滿足要求即大於等於
Math.ceil(m / 2)
時,將該葉子節點的其他索引左移一位,刪除結束;- 若刪除後,葉子節點中的索引個數不滿足最低要求,則查詢左右兄弟節點:
- 若左/右兄弟節點中索引個數大於
Math.ceil(m / 2)
,則從左/右兄弟節點中移動一個索引項到當前葉子節點中,並修改父節點的索引值,刪除結束- 若左/右兄弟節點中索引個數等於
Math.ceil(m / 2)
,則將左/右節點與當前節點合併,修改父節點的索引記錄,並向上逐級判斷內部節點是否因為頁合併導致索引項不滿足最低要求,刪除結束
public void delete(int key) {
if (isEmpty()) {
System.out.println("Invalid: The B+ tree is empty!");
}
else {
LeafNode leafNode = this.root == null ? this.head : findLeafNode(key);
int index = binarySearch(leafNode.entries, leafNode.curNum, key, null);
if (index < 0) {
System.out.println("Invalid: The key does not exist in the B+ tree!");
}
else {
leafNode.deleteAtIndex(index);
// 刪除後,葉子節點仍然滿足要求,刪除結束
if (leafNode.isValid()) {
LeafNode sibling;
InternalNode parent = leafNode.parent;
// 刪除後,葉子節點不滿足要求,左兄弟節點可以移動一個索引項到當前葉子節點
if (leafNode.leftSibling != null && leafNode.leftSibling.parent == parent && leafNode.leftSibling.isAvailable()) {
sibling = leafNode.leftSibling;
Entry entry = sibling.entries[sibling.curNum - 1];
leafNode.insert(entry);
sortEntries(leafNode.entries);
sibling.deleteAtIndex(sibling.curNum - 1);
// 更新 parent 的 key
int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode);
if (entry.key < parent.keys[pointIndex - 1]) {
parent.keys[pointIndex - 1] = entry.key;
}
}
// 刪除後,葉子節點不滿足要求,右兄弟節點可以移動一個索引項到當前葉子節點
else if (leafNode.rightSibling != null && leafNode.rightSibling.parent == parent && leafNode.rightSibling.isAvailable()) {
sibling = leafNode.rightSibling;
Entry entry = sibling.entries[0];
leafNode.insert(entry);
sortEntries(leafNode.entries);
sibling.deleteAtIndex(0);
// 更新 parent 的 key
int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode);
if (entry.key > parent.keys[pointIndex]) {
parent.keys[pointIndex] = entry.key;
}
}
// 刪除後,葉子節點不滿足要求,左兄弟節點可以與當前葉子節點合併
else if (leafNode.leftSibling != null && leafNode.leftSibling.parent == parent && leafNode.leftSibling.isMergeable()) {
sibling = leafNode.leftSibling;
int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode);
parent.removeKey(pointIndex - 1);
parent.removePointer(leafNode);
sibling.rightSibling = leafNode.rightSibling;
if (parent.isValid()) {
handleDeficiency(parent);
}
}
// 刪除後,葉子節點不滿足要求,右兄弟節點可以與當前葉子節點合併
else if (leafNode.rightSibling != null && leafNode.rightSibling.parent == parent && leafNode.rightSibling.isMergeable()) {
sibling = leafNode.rightSibling;
int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode);
parent.removeKey(pointIndex);
parent.removePointer(leafNode);
sibling.leftSibling = leafNode.leftSibling;
if (sibling.leftSibling == null) {
this.head = sibling;
}
// 逐級向上層判斷是否滿足要求
if (parent.isValid()) {
handleDeficiency(parent);
}
}
// 刪除後,B+樹為空
else if (this.root == null && this.head.curNum == 0) {
this.head = null;
}
}
}
}
}
/**
* 處理不滿足要求的內部節點
* @param internalNode
*/
private void handleInvalidInternalNode(InternalNode internalNode) {
InternalNode sibling;
InternalNode parent = internalNode.parent;
// 當前內部節點為根節點
if (root == internalNode) {
for (int i = 0; i < internalNode.childPointers.length; i++) {
if (internalNode.childPointers[i] != null) {
if (internalNode.childPointers[i] instanceof InternalNode) {
root = (InternalNode) internalNode.childPointers[i];
root.parent = null;
}
else if (internalNode.childPointers[i] instanceof LeafNode) {
root = null;
}
}
}
}
// 左兄弟節點可以移動索引項
else if (internalNode.leftSibling != null && internalNode.leftSibling.isAvailable()) {
sibling = internalNode.leftSibling;
Integer key = sibling.keys[internalNode.curNodesNum - 2];
Node pointer = sibling.childPointers[internalNode.curNodesNum - 1];
shiftKeys(internalNode.keys, 1);
shiftPointers(internalNode.childPointers, 1);
internalNode.keys[0] = key;
internalNode.childPointers[0] = pointer;
sibling.removePointer(pointer);
}
// 右兄弟節點可以移動索引項
else if (internalNode.rightSibling != null && internalNode.rightSibling.isAvailable()) {
sibling = internalNode.rightSibling;
Integer key = sibling.keys[0];
Node pointer = sibling.childPointers[0];
internalNode.keys[internalNode.curNodesNum - 1] = parent.keys[0];
internalNode.childPointers[internalNode.curNodesNum] = pointer;
parent.keys[0] = key;
sibling.removePointer(0);
shiftPointers(sibling.childPointers, -1);
}
// 左兄弟節點可以合併
else if (internalNode.leftSibling != null && internalNode.leftSibling.isMergeable()) {
sibling = internalNode.leftSibling;
int index = -1;
for (int i = 0; i < parent.childPointers.length; i++) {
if (parent.childPointers[i] == internalNode) {
index = i;
break;
}
}
parent.keys[index - 1] = parent.keys[index];
for (int i = index; i < parent.keys.length - 1; i++) {
parent.keys[i] = parent.keys[i + 1];
}
shiftPointers(internalNode.childPointers, (int) Math.ceil(m / 2.0));
for (int i = 0; i < (int) Math.ceil(m / 2.0); i++) {
internalNode.childPointers[i] = sibling.childPointers[i];
}
internalNode.leftSibling = sibling.leftSibling;
if (internalNode.leftSibling != null) {
internalNode.leftSibling.rightSibling = internalNode;
}
if (parent != null && parent.isValid()) {
handleInvalidInternalNode(parent);
}
}
// 右兄弟節點可以合併
else if (internalNode.rightSibling != null && internalNode.rightSibling.isMergeable()) {
sibling = internalNode.rightSibling;
int index = -1;
for (int i = 0; i < parent.childPointers.length; i++) {
if (internalNode == parent.childPointers[i]) {
index = i;
break;
}
}
parent.keys[index] = parent.keys[index + 1];
for (int i = index + 2; i < parent.keys.length; i++) {
parent.keys[i - 1] = parent.keys[i];
}
for (int i = 0; i < (int) Math.ceil(m / 2.0); i++) {
internalNode.childPointers[internalNode.curNodesNum++] = sibling.childPointers[i];
}
internalNode.rightSibling = sibling.rightSibling;
if (internalNode.rightSibling != null) {
internalNode.rightSibling.leftSibling = internalNode;
}
if (parent != null && parent.isValid()) {
handleInvalidInternalNode(parent);
}
}
}
參考文章: