[TOC] 一. 簡述一致性哈希演算法 這裡不詳細介紹一致性哈希演算法的起源了, 網上能方便地搜到許多介紹一致性哈希演算法的好文章. 本文主要想動手實現一致性哈希演算法, 並搭建一個環境進行實戰測試. 在開始之前先整理一下 演算法的思路 : 一致性哈希演算法通過把每台伺服器的哈希值打在哈希環上, 把哈希環分成不 ...
目錄
一. 簡述一致性哈希演算法
- 這裡不詳細介紹一致性哈希演算法的起源了, 網上能方便地搜到許多介紹一致性哈希演算法的好文章. 本文主要想動手實現一致性哈希演算法, 並搭建一個環境進行實戰測試.
- 在開始之前先整理一下演算法的思路:
- 一致性哈希演算法通過把每台伺服器的哈希值打在哈希環上, 把哈希環分成不同的段, 然後對到來的請求計算哈希值從而得知該請求所歸屬的伺服器. 這個辦法解決了傳統伺服器增減機器時需要重新計算哈希的麻煩.
- 但如果伺服器的數量較少, 可能導致計算出的哈希值相差較小, 在哈希環上分佈不均勻, 導致某台伺服器過載. 為瞭解決負載均衡問題, 我們引入虛擬節點技術, 為每台伺服器分配一定數量的節點, 通過節點的哈希值在哈希環上進行劃分. 這樣一來, 我們就可以根據機器的性能為其分配節點, 性能好就多分配一點, 差就少一點, 從而達到負載均衡.
二. 實現一致性哈希演算法.
- 奠定了整體思路後我們開始考慮實現的細節
哈希演算法的選擇
- 選擇能散列出32位整數的FNV演算法, 由於該哈希函數可能產生負數, 需要作取絕對值處理.
請求節點在哈希環上尋找對應伺服器的策略
- 策略為: 新節點尋找最近比且它大的節點, 比如說現在已經有環[0, 5, 7, 10], 來了個哈希值為6的節點, 那麼它應該由哈希值為7對應的伺服器處理. 如果請求節點所計算的哈希值大於環上的所有節點, 那麼就取第一個節點. 比如來了個11, 將分配到0所對應的節點.
哈希環的組織結構
- 開始的時候想過用順序存儲的結構存放, 但是在一致性哈希中, 最頻繁的操作是在集合中查找最近且比目標大的數. 如果用順序存儲結構的話, 時間複雜度是收斂於O(N)的, 而樹形結構則為更優的O(logN).
- 但凡事有兩面, 採用樹形結構存儲的代價是數據初始化的效率較低, 而且運行期間如果有節點插入刪除的話效率也比較低. 但是在現實中, 伺服器在一開始註冊後基本上就不怎麼變了, 期間增減機器, 宕機, 機器修複等事件的頻率相比起節點的查詢簡直是微不足道. 所以本案例決定使用使用樹形結構存儲.
- 貼合上述要求, 並且提供有序存儲的首先想到的是紅黑樹, 而且Java中提供了紅黑樹的實現
TreeMap
.
虛擬節點與真實節點的映射關係
如何確定一個虛擬節點對應的真實節點也是個問題. 理論上應該維護一張表記錄真實節點與虛擬節點的映射關係. 本引入案例為了演示採用簡單的字元串處理. 比方說伺服器
192.168.0.1:8888
分配了1000個虛擬節點, 那麼它的虛擬節點名稱從192.168.0.1:8888@1
一直到192.168.0.1:8888@1000
. 通過這樣的處理, 我們在通過虛擬節點找真實節點時只需要裁剪字元串即可.- 計劃定製好後, 下麵開始懟代碼
public class ConsistentHashTest {
/**
* 伺服器列表,一共有3台伺服器提供服務, 將根據性能分配虛擬節點
*/
public static String[] servers = {
"192.168.0.1#100", //伺服器1: 性能指數100, 將獲得1000個虛擬節點
"192.168.0.2#100", //伺服器2: 性能指數100, 將獲得1000個虛擬節點
"192.168.0.3#30" //伺服器3: 性能指數30, 將獲得300個虛擬節點
};
/**
* 真實伺服器列表, 由於增加與刪除的頻率比遍歷高, 用鏈表存儲比較划算
*/
private static List<String> realNodes = new LinkedList<>();
/**
* 虛擬節點列表
*/
private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();
static{
for(String s : servers){
//把伺服器加入真實伺服器列表中
realNodes.add(s);
String[] strs = s.split("#");
//伺服器名稱, 省略埠號
String name = strs[0];
//根據伺服器性能給每台真實伺服器分配虛擬節點, 並把虛擬節點放到虛擬節點列表中.
int virtualNodeNum = Integer.parseInt(strs[1]) * 10;
for(int i = 1; i <= virtualNodeNum; i++){
virtualNodes.put(FVNHash(name + "@" + i), name + "@" + i);
}
}
}
public static void main(String[] args) {
new Thread(new RequestProcess()).start();
}
static class RequestProcess implements Runnable{
@Override
public void run() {
String client = null;
while(true){
//模擬產生一個請求
client = getN() + "." + getN() + "." + getN() + "." + getN() + ":" + (1000 + (int)(Math.random() * 9000));
//計算請求的哈希值
int hash = FVNHash(client);
//判斷請求將由哪台伺服器處理
System.out.println(client + " 的請求將由 " + getServer(client) + " 處理");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static String getServer(String client) {
//計算客戶端請求的哈希值
int hash = FVNHash(client);
//得到大於該哈希值的所有map集合
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
//找到比該值大的第一個虛擬節點, 如果沒有比它大的虛擬節點, 根據哈希環, 則返回第一個節點.
Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
//通過該虛擬節點獲得真實節點的名稱
String virtualNodeName = virtualNodes.get(targetKey);
String realNodeName = virtualNodeName.split("@")[0];
return realNodeName;
}
public static int getN(){
return (int)(Math.random() * 128);
}
public static int FVNHash(String data){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i = 0; i < data.length(); i++)
hash = (hash ^ data.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}
/* 運行結果片段
55.1.13.47:6240 的請求將由 192.168.0.1 處理
5.49.56.126:1105 的請求將由 192.168.0.1 處理
90.41.8.88:6884 的請求將由 192.168.0.2 處理
26.107.104.81:2989 的請求將由 192.168.0.2 處理
114.66.6.56:8233 的請求將由 192.168.0.1 處理
123.74.52.94:5523 的請求將由 192.168.0.1 處理
104.59.60.2:7502 的請求將由 192.168.0.2 處理
4.94.30.79:1299 的請求將由 192.168.0.1 處理
10.44.37.73:9332 的請求將由 192.168.0.2 處理
115.93.93.82:6333 的請求將由 192.168.0.2 處理
15.24.97.66:9177 的請求將由 192.168.0.2 處理
100.39.98.10:1023 的請求將由 192.168.0.2 處理
61.118.87.26:5108 的請求將由 192.168.0.2 處理
17.79.104.35:3901 的請求將由 192.168.0.1 處理
95.36.5.25:8020 的請求將由 192.168.0.2 處理
126.74.56.71:7792 的請求將由 192.168.0.2 處理
14.63.56.45:8275 的請求將由 192.168.0.1 處理
58.53.44.71:2089 的請求將由 192.168.0.3 處理
80.64.57.43:6144 的請求將由 192.168.0.2 處理
46.65.4.18:7649 的請求將由 192.168.0.2 處理
57.35.27.62:9607 的請求將由 192.168.0.2 處理
81.114.72.3:3444 的請求將由 192.168.0.1 處理
38.18.61.26:6295 的請求將由 192.168.0.2 處理
71.75.18.82:9686 的請求將由 192.168.0.2 處理
26.11.98.111:3781 的請求將由 192.168.0.1 處理
62.86.23.37:8570 的請求將由 192.168.0.3 處理
*/
- 經過上面的測試我們可以看到性能較好的伺服器1和伺服器2分擔了大部分的請求, 只有少部分請求落到了性能較差的伺服器3上, 已經初步實現了負載均衡.
- 下麵我們將結合zookeeper, 搭建一個更加逼真的伺服器集群, 看看在部分伺服器上線下線的過程中, 一致性哈希演算法是否仍能夠實現負載均衡.
三. 結合zookeeper搭建環境
環境介紹
- 首先會通過啟動多台虛擬機模擬伺服器集群, 各台伺服器都提供一個相同的介面供消費者消費.
- 同時會有一個消費者線程不斷地向伺服器集群發起請求, 這些請求會經過一致性哈希演算法均衡負載到各個伺服器.
- 為了能夠模擬上述場景, 我們必須在客戶端維護一個伺服器列表, 使得客戶端能夠通過一致性哈希演算法選擇伺服器發送. (現實中可能會把一致性哈希演算法實現在前端伺服器, 客戶先訪問前端伺服器, 再路由到後端伺服器集群).
- 但是我們的重點是模擬伺服器的宕機和上線, 看看一致性哈希演算法是否仍能實現負載均衡. 所以客戶端必須能夠感知伺服器端的變化並動態地調整它的伺服器列表.
- 為了完成這項工作, 我們引入
zookeeper
,zookeeper
的數據一致性演算法保證數據實時, 準確, 客戶端能夠通過zookeeper
得知實時的伺服器情況. - 具體操作是這樣的: 伺服器集群先以臨時節點的方式連接到
zookeeper
, 併在zookeeper
上註冊自己的介面服務(註冊節點). 客戶端連接上zookeeper
後, 把已註冊的節點(伺服器)添加到自己的伺服器列表中. - 如果有伺服器宕機的話, 由於當初註冊的是瞬時節點的原因, 該台伺服器節點會從
zookeeper
中註銷. 客戶端監聽到伺服器節點有變時, 也會動態調整自己的伺服器列表, 把當宕機的伺服器從伺服器列表中刪除, 因此不會再向該伺服器發送請求, 負載均衡的任務將交到剩餘的機器身上. - 當有伺服器從新連接上集群後, 客戶端的伺服器列表也會更新, 哈希環也將做出相應的變化以提供負載均衡.
具體操作:
I. 搭建zookeeper
集群環境:
- 創建3個
zookeeper
服務, 構成集群. 在各自的data
文件夾中添加一個myid
文件, 各個id分別為1, 2, 3
.
- 重新複製一份配置文件, 在配置文件中配置各個
zookeeper
的埠號. 本案例中三台zookeeper
分別在2181, 2182, 2183
埠
啟動
zookeeper
集群由於zookeeper不是本案例的重點, 細節暫不展開講了.
II. 創建伺服器集群, 提供RPC遠程調用服務
- 首先創建一個伺服器項目(使用Maven), 添加
zookeeper
依賴 - 創建常量介面, 用於存儲連接
zookeeper
的信息
public interface Constant {
//zookeeper集群的地址
String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
//連接zookeeper的超時時間
int ZK_TIME_OUT = 5000;
//伺服器所發佈的遠程服務在zookeeper中的註冊地址, 也就是說這個節點中保存了各個伺服器提供的介面
String ZK_REGISTRY = "/provider";
//zookeeper集群中註冊服務的url地址的瞬時節點
String ZK_RMI = ZK_REGISTRY + "/rmi";
}
3.封裝操作zookeeper
和發佈遠程服務的介面供自己調用, 本案例中發佈遠程服務使用Java自身提供的rmi
包完成, 如果沒有瞭解過可以參考這篇
public class ServiceProvider {
private CountDownLatch latch = new CountDownLatch(1);
/**
* 連接zookeeper集群
*/
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果連接上了就喚醒當前線程.
latch.countDown();
}
});
latch.await();//還沒連接上時當前線程等待
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
/**
* 創建znode節點
* @param zk
* @param url 節點中寫入的數據
*/
public void createNode(ZooKeeper zk, String url){
try{
//要把寫入的數據轉化為位元組數組
byte[] data = url.getBytes();
zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 發佈rmi服務
*/
public String publishService(Remote remote, String host, int port){
String url = null;
try{
LocateRegistry.createRegistry(port);
url = "rmi://" + host + ":" + port + "/rmiService";
Naming.bind(url, remote);
} catch (Exception e) {
e.printStackTrace();
}
return url;
}
/**
* 發佈rmi服務, 並且將服務的url註冊到zookeeper集群中
*/
public void publish(Remote remote, String host, int port){
//調用publishService, 得到服務的url地址
String url = publishService(remote, host, port);
if(null != url){
ZooKeeper zk = connectToZK();//連接到zookeeper
if(null != zk){
createNode(zk, url);
}
}
}
}
- 自定義遠程服務. 服務提供一個簡單的方法: 客戶端發來一個字元串, 伺服器在字元串前面添加上
Hello
, 並返回字元串.
//UserService
public interface UserService extends Remote {
public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {
public UserServiceImpl() throws RemoteException{
super();
}
@Override
public String helloRmi(String name) throws RemoteException {
return "Hello " + name + "!";
}
}
- 修改埠號, 啟動多個java虛擬機, 模擬伺服器集群. 為了方便演示, 自定義7777, 8888, 9999埠開啟3個伺服器進程, 到時會模擬7777埠的伺服器宕機和修複重連.
public static void main(String[] args) throws RemoteException {
//創建工具類對象
ServiceProvider sp = new ServiceProvider();
//創建遠程服務對象
UserService userService = new UserServiceImpl();
//完成發佈
sp.publish(userService, "localhost", 9999);
}
III. 編寫客戶端程式(運用一致性哈希演算法實現負載均衡
- 封裝客戶端介面.
public class ServiceConsumer {
/**
* 提供遠程服務的伺服器列表, 只記錄遠程服務的url
*/
private volatile List<String> urls = new LinkedList<>();
/**
* 遠程服務對應的虛擬節點集合
*/
private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();
public ServiceConsumer(){
ZooKeeper zk = connectToZK();//客戶端連接到zookeeper
if(null != zk){
//連接上後關註zookeeper中的節點變化(伺服器變化)
watchNode(zk);
}
}
private void watchNode(final ZooKeeper zk) {
try{
//觀察/provider節點下的子節點是否有變化(是否有伺服器登入或登出)
List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果伺服器節點有變化就重新獲取
if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("伺服器端有變化, 可能有舊伺服器宕機或者新伺服器加入集群...");
watchNode(zk);
}
}
});
//將獲取到的伺服器節點數據保存到集合中, 也就是獲得了遠程服務的訪問url地址
List<String> dataList = new LinkedList<>();
TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
for(String nodeStr : nodeList){
byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
//放入伺服器列表的url
String url = new String(data);
//為每個伺服器分配虛擬節點, 為了方便模擬, 預設開啟在9999埠的伺服器性能較差, 只分配300個虛擬節點, 其他分配1000個.
if(url.contains("9999")){
for(int i = 1; i <= 300; i++){
newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
}
}else{
for(int i = 1; i <= 1000; i++){
newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
}
}
dataList.add(url);
}
urls = dataList;
virtualNodes = newVirtualNodesList;
dataList = null;//好讓垃圾回收器儘快收集
newVirtualNodesList = null;
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根據url獲得遠程服務對象
*/
public <T> T lookUpService(String url){
T remote = null;
try{
remote = (T)Naming.lookup(url);
} catch (Exception e) {
//如果該url連接不上, 很有可能是該伺服器掛了, 這時使用伺服器列表中的第一個伺服器url重新獲取遠程對象.
if(e instanceof ConnectException){
if (urls.size() != 0){
url = urls.get(0);
return lookUpService(url);
}
}
}
return remote;
}
/**
* 通過一致性哈希演算法, 選取一個url, 最後返回一個遠程服務對象
*/
public <T extends Remote> T lookUp(){
T service = null;
//隨機計算一個哈希值
int hash = FVNHash(Math.random() * 10000 + "");
//得到大於該哈希值的所有map集合
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
//找到比該值大的第一個虛擬節點, 如果沒有比它大的虛擬節點, 根據哈希環, 則返回第一個節點.
Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
//通過該虛擬節點獲得伺服器url
String virtualNodeName = virtualNodes.get(targetKey);
String url = virtualNodeName.split("@")[0];
//根據伺服器url獲取遠程服務對象
service = lookUpService(url);
System.out.print("提供本次服務的地址為: " + url + ", 返回結果: ");
return service;
}
private CountDownLatch latch = new CountDownLatch(1);
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//判斷是否連接zk集群
latch.countDown();//喚醒處於等待狀態的當前線程
}
});
latch.await();//沒有連接上的時候當前線程處於等待狀態.
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return zk;
}
public static int FVNHash(String data){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i = 0; i < data.length(); i++)
hash = (hash ^ data.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}
- 啟動客戶端進行測試
public static void main(String[] args){
ServiceConsumer sc = new ServiceConsumer();//創建工具類對象
while(true){
//獲得rmi遠程服務對象
UserService userService = sc.lookUp();
try{
//調用遠程方法
String result = userService.helloRmi("炭燒生蚝");
System.out.println(result);
Thread.sleep(100);
}catch(Exception e){
e.printStackTrace();
}
}
}
客戶端跑起來後, 在顯示台不斷進行列印...下麵將對數據進行統計.
IV. 對伺服器調用數據進行統計分析
- 重溫一遍模擬的過程: 首先分別在7777, 8888, 9999埠啟動了3台伺服器. 然後啟動客戶端進行訪問. 7777, 8888埠的兩台伺服器設置性能指數為1000, 而9999埠的伺服器性能指數設置為300.
- 在客戶端運行期間, 我手動關閉了8888埠的伺服器, 客戶端正常列印出伺服器變化信息. 此時理論上不會有訪問被路由到8888埠的伺服器. 當我重新啟動8888埠伺服器時, 客戶端列印出伺服器變化信息, 訪問能正常到達8888埠伺服器.
- 下麵對各伺服器的訪問量進行統計, 看是否實現了負載均衡.
- 測試程式如下:
public class DataStatistics {
private static float ReqToPort7777 = 0;
private static float ReqToPort8888 = 0;
private static float ReqToPort9999 = 0;
public static void main(String[] args) {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("C://test.txt"));
String line = null;
while(null != (line = br.readLine())){
if(line.contains("7777")){
ReqToPort7777++;
}else if(line.contains("8888")){
ReqToPort8888++;
}else if(line.contains("9999")){
ReqToPort9999++;
}else{
print(false);
}
}
print(true);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(null != br){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
br = null;
}
}
}
private static void print(boolean isEnd){
if(!isEnd){
System.out.println("------------- 伺服器集群發生變化 -------------");
}else{
System.out.println("------------- 最後一次統計 -------------");
}
System.out.println("截取自上次伺服器變化到現在: ");
float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
System.out.println("7777埠伺服器訪問量為: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
System.out.println("8888埠伺服器訪問量為: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
System.out.println("9999埠伺服器訪問量為: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
ReqToPort7777 = 0;
ReqToPort8888 = 0;
ReqToPort9999 = 0;
}
}
/* 以下是輸出結果
------------- 伺服器集群發生變化 -------------
截取自上次伺服器變化到現在:
7777埠伺服器訪問量為: 198.0, 占比0.4419643
8888埠伺服器訪問量為: 184.0, 占比0.4107143
9999埠伺服器訪問量為: 66.0, 占比0.14732143
------------- 伺服器集群發生變化 -------------
截取自上次伺服器變化到現在:
7777埠伺服器訪問量為: 510.0, 占比0.7589286
8888埠伺服器訪問量為: 1.0, 占比0.0014880953
9999埠伺服器訪問量為: 161.0, 占比0.23958333
------------- 最後一次統計 -------------
截取自上次伺服器變化到現在:
7777埠伺服器訪問量為: 410.0, 占比0.43248945
8888埠伺服器訪問量為: 398.0, 占比0.41983122
9999埠伺服器訪問量為: 140.0, 占比0.14767933
*/
V. 結果
- 從測試數據可以看出, 不管是8888埠伺服器宕機之前, 還是宕機之後, 三台伺服器接收的訪問量和性能指數成正比. 成功地驗證了一致性哈希演算法的負載均衡作用.
四. 擴展思考
- 初識一致性哈希演算法的時候, 對這種奇特的思路佩服得五體投地. 但是一致性哈希演算法除了能夠讓後端伺服器實現負載均衡, 還有一個特點可能是其他負載均衡演算法所不具備的.
- 這個特點是基於哈希函數的, 我們知道通過哈希函數, 固定的輸入能夠產生固定的輸出. 換句話說, 同樣的請求會路由到相同的伺服器. 這點就很牛逼了, 我們可以結合一致性哈希演算法和緩存機制提供後端伺服器的性能.
- 比如說在一個分散式系統中, 有一個伺服器集群提供查詢用戶信息的方法, 每個請求將會帶著用戶的
uid
到達, 我們可以通過哈希函數進行處理(從上面的演示代碼可以看到, 這點是可以輕鬆實現的), 使同樣的uid
路由到某個獨定的伺服器. 這樣我們就可以在伺服器上對該的uid
背後的用戶信息進行緩存, 從而減少對資料庫或其他中間件的操作, 從而提高系統效率. - 當然如果使用該策略的話, 你可能還要考慮緩存更新等操作, 但作為一種優良的策略, 我們可以考慮在適當的場合靈活運用.
- 以上思考受啟發於
Dubbo
框架中對其實現的四種負載均衡策略的描述.