在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。 在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用n ...
在上一個版本中利用netty實現了簡單的一對一的RPC,需要手動設置服務地址,限制性較大。
在本文中,利用zookeeper作為服務註冊中心,在服務端啟動時將本地的服務信息註冊到zookeeper中,當客戶端發起遠程服務調用時,先從zookeeper中獲取該服務的地址,然後根據獲得的這個地址來利用netty進行網路傳送。
在服務端和註冊中心之間需要建立監聽,當服務信息發生變化或網路連接等問題時需要對註冊中心的服務信息進行修改。在本文中創建了服務註冊監控中心,利用心跳機制來判斷與服務端是否有較穩定的連接,當出現網路不穩定時,則從註冊中心中刪除屬於該服務端的服務信息。在本項目中設定在5分鐘內3次以上沒有發送心跳包為不穩定狀態。
關於心跳機制,之前有一篇文章介紹過:Dubbo心跳機制
zookeeper註冊中心
zookeeper是hadoop中的一個重要組件,其主要是作為分散式協調服務
zookeeper採用節點樹的數據模型,類似linux文件系統。
每個節點稱做一個ZNode,每個ZNode都可以通過路徑唯一標識,同時每個節點還可以存儲少量數據。
本項目借鑒dubbo的註冊中心模型來設計本文的註冊中心。
總體上設計了四級節點,在一個節點是一個持久節點/register
,表示是記錄註冊服務的區域。二級節點是服務介面名,三級節點是遠程服務ip地址,該節點是臨時節點,節點存儲的數據是具體的實現類名。
在客戶端會根據服務介面名在註冊中心進行查找,得到遠程服務ip地址,並根據節點中存儲的具體實現類名進行反射。
首先進行zookeeper初始化,利用了CuratorFramework
有關類
private static void init() {
RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
client = CuratorFrameworkFactory.builder().connectString(ZKConsts.ZK_SERVER_PATH)
.sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
.namespace(ZKConsts.WORK_SPACE).build();
client.start();
}
服務的註冊代碼
public static void register(URL url) {
try {
String interfaceName = url.getInterfaceName();
String implClassName = url.getImplClassName();
Stat stat = client.checkExists().forPath(getPath(interfaceName, url.toString()));
if (stat != null) {
System.out.println("該節點已存在!");
client.delete().forPath(getPath(interfaceName, url.toString()));
}
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
//許可權控制,任何連接的客戶端都可以操作該屬性znode
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(getPath(interfaceName, url.toString()), implClassName.getBytes());
System.out.println(getPath(interfaceName, url.toString()));
} catch (Exception e) {
e.printStackTrace();
}
}
根據服務介面名來獲取遠程服務連接地址
public static URL random(String interfaceName) {
try {
System.out.println("開始查找服務節點:" + getPath(interfaceName));
List<String> urlList = client.getChildren().forPath("/" + interfaceName);
System.out.println("結果:" + urlList);
String serviceUrl = urlList.get(0);
String[] urls = serviceUrl.split(":");
String implClassName = get(interfaceName, serviceUrl);
return new URL(urls[0], Integer.valueOf(urls[1]), interfaceName, implClassName);
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
return null;
}
註冊中心與服務端進行連接時需要判斷是否維持了穩定的連接,如果服務端出現宕機等情況時需要從註冊中心中刪除這些服務。
以前的一些處理機制,有session機制和wacher機制。
session機制
每個zookeeper註冊中心與服務端進行連接時會創建一個session,在設置的sessionTimeout內,服務端會與註冊中心進行心跳包的定時發送,從而感知每個客戶端是否宕機,如果創建某個臨時Znode節點對應的session銷毀時,相應的臨時節點也會被註冊中心刪除。
watcher機制
針對每個節點的操作,都有要給監督者進行watcher,當監控的某個節點發生了變化,則會觸發watcher事件。註冊中心的watcher是一次性的,觸發後會被銷毀。父節點,子節點增刪改都能夠觸發watcher。觸發銷毀後,下次需要監聽時還需要再註冊一次。
本文心跳機制
服務端定時向註冊中心發送本機地址,看作心跳數據包,而註冊中心監控則維持一個channelId和具體地址的map,並且通過IdleHandler監聽空閑事件,到達一定的空閑次數則認為不活躍,當不活躍時,zookeeper刪除對應的url節點。該版本實現了上面的內容,後續的步驟在以後的版本實現。
如果10s內沒有觸發讀,就會執行userEventTriggered
方法。如果5分鐘中出現兩次不活躍次數,就認定該連接不穩定,註冊中心會移除屬於該服務端的服務。你也可以根據實際情況設定不穩定標準。
service.scheduleAtFixedRate(() -> {
if (future.channel().isActive()) {
int time = new Random().nextInt(5);
log.info("本次定時任務獲取的隨機數:{}", time);
if (time > 3) {
log.info("發送本地地址到註冊中心:{}", url);
future.channel().writeAndFlush(url);
}
}
}, 60, 60, TimeUnit.SECONDS);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent state = (IdleStateEvent)evt;
if (state.state().equals(IdleState.READER_IDLE)) {
log.info("讀空閑");
} else if (state.state().equals(IdleState.WRITER_IDLE)) {
log.info("寫空閑");
}
//在一定時間內讀寫空閑才會關閉鏈接
else if (state.state().equals(IdleState.ALL_IDLE)) {
if (++inActiveCount == 1) {
start = System.currentTimeMillis();
}
int minute = (int)((System.currentTimeMillis() - start) / (60 * 1000)) + 1;
log.info("第{}次讀寫都空閑,計時分鐘數{}", inActiveCount, minute);
if (inActiveCount > 2 && minute < 5) {
log.info("移除不活躍ip");
removeAndClose(ctx);
} else {
if (minute >= 5) {
log.info("新周期開始");
start = 0;
inActiveCount = 0;
}
}
}
}
}
具體實現代碼:RPC第二版