在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而co ...
在dubbo中,關於註冊中心Registry的有關實現封裝在了dubbo-registry模塊中。提供者(Provider)個消費者(Consumer)都是通過註冊中心進行資源的調度。當服務啟動時,provider會調用註冊中心的register方法將自己的服務通過url的方式發佈到註冊中心,而consumer訂閱其他服務時,會將訂閱的服務通過url發送給註冊中心(URL中通常會包含各種配置)。當某個服務被關閉時,它則會從註冊中心中移除,當某個服務被修改時,則會調用notify方法觸發所有的監聽器。
首先簡單介紹一下在dubbo的基本統一數據模型URL
統一數據模型URL
在dubbo中定義的url與傳統的url有所不同,用於在擴展點之間傳輸數據,可以從url參數中獲取配置信息等數據,這一點很重要。
描述一個dubbo協議的服務
dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000
描述一個消費者
consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer×tamp=1545721827784
接下來將著重介紹幾個重要的類。
AbstractRegistry
AbstractRegistry實現的是Registry介面,是Registry的抽象類。為了減輕註冊中心的壓力,在該類中實現了把本地url緩存到記憶體緩存property文件中,並且實現了註冊中心的註冊、訂閱等方法。
在該類中有介個關於url的變數。
private final Set<URL> registered = new ConcurrentHashSet<URL>();
-> 記錄已經註冊服務的URL集合,註冊的URL不僅僅可以是服務提供者的,也可以是服務消費者的。private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
-> 消費者url訂閱的監聽器集合private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
-> 某個消費者被通知的服務URL集合,最外部URL的key是消費者的URL,value是一個map集合,裡面的map中的key為分類名,value是該類下的服務url集合。private URL registryUrl;
-> 註冊中心URLprivate File file;
-> 本地磁碟緩存文件,緩存註冊中心的數據初始化
public AbstractRegistry(URL url) {
//1. 設置配置中心的地址
setUrl(url);
//2. 配置中心的URL中是否配置了同步保存文件屬性,否則預設為false
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
//3. 配置信息本地緩存的文件名
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
//逐層創建文件目錄
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
//如果現有配置緩存,則從緩存文件中載入屬性
loadProperties();
notify(url.getBackupUrls());
}
載入本地磁碟緩存文件到記憶體緩存中,也就是把文件中的數據寫入到properties中
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
// 把數據寫入到記憶體緩存中
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
註冊與取消註冊
對registered變數執行add和remove操作
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
訂閱與取消訂閱
通過消費者url從subscribed變數中獲取該消費者的所有監聽器集合,然後將該監聽器放入到集合中,取消同理。
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 獲得該消費者url 已經訂閱的服務 的監聽器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// 添加某個服務的監聽器
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
}
服務的恢復
註冊的恢復包括註冊服務的恢復和訂閱服務的恢復,因為在記憶體中表留了註冊的服務和訂閱的服務,因此在恢復的時候會重新拉取這些數據,分別調用發佈和訂閱的方法來重新將其錄入到註冊中心中。
protected void recover() throws Exception {
// register
//把記憶體緩存中的registered取出來遍歷進行註冊
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
register(url);
}
}
// subscribe
//把記憶體緩存中的subscribed取出來遍歷進行訂閱
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
subscribe(url, listener);
}
}
}
}
通知
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 遍歷訂閱URL的監聽器集合,通知他們
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
// 匹配
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 遍歷監聽器集合,通知他們
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
/**
* 通知監聽器,URL 變化結果
* @param url
* @param listener
* @param urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 將urls進行分類
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// 按照url中key為category對應的值進行分類,如果沒有該值,就找key為providers的值進行分類
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
// 分類結果放入result
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
// 獲得某一個消費者被通知的url集合(通知的 URL 變化結果)
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
// 添加該消費者對應的url
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
// 處理通知監聽器URL 變化結果
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把分類標實和分類後的列表放入notified的value中
// 覆蓋到 `notified`
// 當某個分類的數據為空時,會依然有 urls 。其中 `urls[0].protocol = empty` ,通過這樣的方式,處理所有服務提供者為空的情況。
categoryNotified.put(category, categoryList);
// 保存到文件
saveProperties(url);
//通知監聽器
listener.notify(categoryList);
}
}
在構造函數的最後一句,調用notify(url.getBackupUrls()); 來將註冊中心url返回的urls來進行通知。從下麵代碼可以開出返回的urls是通過url的參數獲得的。
public List<URL> getBackupUrls() {
List<URL> urls = new ArrayList<URL>();
urls.add(this);
String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
for (String backup : backups) {
urls.add(this.setAddress(backup));
}
}
return urls;
}
然後獲取遍歷所有訂閱URL,類型Map<URL,Set<NotifyListener>>
,判斷遍歷中的當前url與傳入的backupURL是否匹配,匹配了繼續向下執行,否則則跳過這個url,再處理下一個url。當向下執行時,獲取遍歷當前url的監聽器。對每個監聽器執行notify(url, listener, filterEmpty(url, urls))
protected static List<URL> filterEmpty(URL url, List<URL> urls) {
if (urls == null || urls.isEmpty()) {
List<URL> result = new ArrayList<URL>(1);
result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
return result;
}
return urls;
}
如果urls為空,則將根據url的信息新建一個url,並設置協議為空協議,放入到urls中。
然後執行notify方法,將backupURLS進行分類,放入到result中。
在上述中遍歷所有訂閱的urls,然後在每個url中再執行nofity,所以接下來的步驟可以理解成遍歷訂閱的urls,在迴圈內部獲取每個url的被通知的urls集合。
每個url獲取一個被通知的urls集合,categoryNotified
之後遍歷backURLs,它會覆蓋掉原來被通知的集合categoryNotified
遍歷結束後,會將結果保存到文件中,
最後通知監聽器處理,最後的這個通知方法在之後的篇章解釋。