服務目錄 服務目錄對應的介面是Directory,這個介面里主要的方法是 List list(Invocation invocation) throws RpcException; 列出所有的Invoker,對於服務消費端而言,一個Invoker對應一個可用的服務提供者,底層封裝了一個tcp連接。當 ...
服務目錄
服務目錄對應的介面是Directory,這個介面里主要的方法是
List<Invoker<T>> list(Invocation invocation) throws RpcException;
列出所有的Invoker,對於服務消費端而言,一個Invoker對應一個可用的服務提供者,底層封裝了一個tcp連接。當然Invoker也可以是嵌套的,一個Invoker內包含了多個實際的Invoker。通過Cluster對象將一個服務目錄封裝成一個Invoker,內部包含了故障轉移,服務路由,負載均衡,等等相關的集群邏輯。
回到服務目錄,主要包括兩種服務目錄,StaticDirectory,RegistryDirectory。
- StaticDirectory。靜態服務目錄,顧名思義,這個目錄在創建的時候就會通過構造方法傳進一個Invoker列表,在之後過程中這個列表不再變化。
- RegistryDirectory。通過監聽註冊中心的服務提供者信息動態更新Invoker列表的服務目錄。
從上節服務引入,我們知道,不論是StaticDirectory還是RegistryDirectory,最終都會通過Cluster.join方法封裝為一個Invoker。由於靜態服務目錄的邏輯很簡單,這裡不再贅述,本節我們主要分析一下註冊中心的服務目錄。
RegistryDirectory概述
這個類除了繼承了AbstractDirectory,還實現了NotifyListener介面。NotifyListener介面是一個監聽類,用於監聽註冊中心配置信息的變更事件。我們首先簡單看一下RegistryDirectory中實現Directory介面的部分代碼。
AbstractDirectory.list
list方法的實現放在抽象類AbstractDirectory中,
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
return doList(invocation);
}
wishing就是一個狀態的判斷。doList是一個模板方法,由子類實現。
RegistryDirectory.doList
@Override
public List<Invoker<T>> doList(Invocation invocation) {
// 當狀態量forbidden為true時,服務調用被禁止
// 什麼時候forbidden為true呢??當url只有一個,且協議名稱為empty時,就以為這沒有服務提供者可用。
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
// 服務分組
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
// 從緩存中取出Invoker列表,並經由服務路由獲取相應的Invoker
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
// FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null?
/*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
invokers = localMethodInvokerMap.get(methodName);
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}*/
return invokers == null ? Collections.emptyList() : invokers;
}
這個方法的主要邏輯是,首先判斷服務是否可用(根據forbidden狀態變數)。然後從路由鏈中取出Invoker列表。由於服務路由並不是本節的重點,所以我們只是簡單第看一下RouterChain.route方法
RouterChain.route
public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
一次調用路由列表中的路由規則,最終返回經過多個路由規則路由過的Invoker列表。類似於責任鏈模式,有點像web容器的過濾器,或者是spring-mvc中的攔截器,都是一個鏈式的調用。
實際上我們平時一般較少使用到路由功能,所以這裡routers列表實際上是空的,這種情況下不用經過任何路由,直接原樣返回Invokers列表。而至於RouterChain內部的invokers成員是哪來的,RegistryDirectory監聽註冊中心發生變更後刷新本地緩存中的Invokers列表,並將其註入到RouterChain對象中,我們後面會講到。
RegistryDirectory.notify
接下來我們分析RegistryDirectory中最重要的方法,也就是監聽方法,用於監聽註冊中心的變更事件。
public synchronized void notify(List<URL> urls) {
// 將監聽到的url分類,
// 按照協議名稱或者category參數分為configurators,routers,providers三類
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
// 如果有變化的configurators類別的url,那麼將其轉化為參數並設到成員變數configurators
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 如果有變更的路由信息url,那麼將其轉化為Router對象並覆蓋原先的路由信息
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
// 最後處理最重要的服務提供者變更信息,並用這些url刷新當前緩存的Invoker
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
首先將從註冊中心獲取到的最新的url進行分類,根據協議名稱或者category參數將url分為三類:configurators, routers, providers,
- configurators類型的url被轉換為Configurator列表,覆蓋本地緩存
- routers類型的url被轉換為Router列表,並被設置到routerChain對象中
- providers類型的url則被用於接下來的創建Invoker
RegistryDirectory.refreshOverrideAndInvoker
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
// 用變更的配置信息覆蓋overrideDirectoryUrl成員變數
overrideDirectoryUrl();
// 刷新緩存中的Invokers
refreshInvoker(urls);
}
overrideDirectoryUrl方法的作用主要是用從註冊中心以及配置中心監聽到的變更的配置覆蓋本地的overrideDirectoryUrl成員變數中的配置。我們接著往下走。
RegistryDirectory.refreshInvoker
// 入參invokerUrls是從註冊中心拉取的服務提供者url
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
// 如果只有一個服務提供者,並且協議名稱是empty,說明無提供者可用
// 將狀態forbidden設為true, invokers設為空列表
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
// 記下舊的Invoker列表
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
// 如果從註冊中心沒有拉取到服務提供者信息,那麼使用之前緩存的服務提供者信息
// 這就是為什麼dubbo在註冊中心掛了之後消費者仍然能夠調用提供者,因為消費者在本地進行了緩存
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
// 如果註冊中心沒有提供者信息,並且本地也沒有緩存,那麼就沒法進行服務調用了
if (invokerUrls.isEmpty()) {
return;
}
// 將服務提供者url轉化為Invoker對象存放到map中
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
// 將生成的Invoker列表設置到routerChain的緩存中,
// routerChain將對這些Invoker進行路由
routerChain.setInvokers(newInvokers);
// 處理服務分組的情況
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
// 將緩存的Invoker設置為新生成的
this.urlInvokerMap = newUrlInvokerMap;
try {
// 這裡實際上求新的Invoker列表和舊的差集,將不再使用的舊的Invoker銷毀
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
這個方法首先根據監聽到的提供者url列表判斷是否處於服務禁用狀態,判斷依據是:如果只有一個url,並且該url協議名稱是empty,說明無提供者可用,將forbidden變數設為true,即禁止服務調用,
並做一下其他的相關設置以及銷毀緩存中的Invoker。- 如果不是禁止狀態,繼續往下走。如果從註冊中心獲取到的url列表為空,那麼檢查本地緩存的url列表是否為空,如果緩存不為空就用緩存的列表。如果本地緩存也為空,說明無服務可用,直接返回。
- 如果如果從註冊中心獲取到的url列表不為空,說明有服務可用,這時就不會再去嘗試本地緩存了(因為緩存已經過期了),並且將本地緩存更新為新獲取的url列表。
- 將可用的提供者url列表轉化為Invoker列表。
- 將新創建的Invoker列表設置到routerChain中,這裡呼應了前文提到的在doList方法中,從routerChain對象中取出緩存的Invoker列表。
- 將本地緩存的url->Invoker map更新為新創建的。
最後銷毀緩存中不再使用的Invoker
RegistryDirectory.toInvokers
/**
* Turn urls into invokers, and if url has been refer, will not re-reference.
*
* @param urls 從註冊中心拉取的服務提供者信息
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
// 用於防止對相同的url重覆創建Invoker
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
// 如果消費端配置了協議名稱,那麼只有符合條件的提供者url才會被使用
// 這段代碼有待商榷 ,應該先把queryProtocols處理好,避免重覆做同樣的工作
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
// 如果協議名稱是empty,那麼忽略該條url
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 如果當前classpath下找不到與提供者url中協議名稱相對應的Protocol類,那麼列印錯誤日誌同時忽略該條url
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
// 合併消費端設置的參數以及從註冊中心,配置中心監聽到的配置變更
URL url = mergeUrl(providerUrl);
// 以全路徑作為該url的唯一標識
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
// 如果之前已經創建過該url的Invoker對象,那麼就不用再重覆創建
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
// 檢查disabled和enabled參數的值
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// 真正創建Invoker的地方,
// InvokerDelegate只是個簡單的包裝類,不需要多說
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
- 首先根據協議名稱檢查url是否可用。url的協議必須在本地配置的協議列表中(如果沒有配置就不需要做此檢查);如果協議名稱是empty則忽略這個url;如果當前classpath下找不到與提供者url中協議名稱相對應的Protocol類,那麼列印錯誤日誌同時忽略該條url
- 合併消費端設置的參數以及從註冊中心,配置中心監聽到的配置變更
- 檢查disabled,enabled參數的值,判斷該url是否啟用,如果disabled為true則跳過該url;如果沒有disabled參數,檢查enabled參數,如果enabled為false則跳過該url,enabled預設是true。
- 調用Protocol.refer方法創建Invoker對象。
這裡需要說明一下,由於Directory不是通過SPI機制載入的,所以RegistryDirectory也不是通過ExtensionLoader載入的,所以也就不會受到ExtensionLoader的IOC影響。RegistryDirectory內部的protocol成員是在RegistryDirectory初始化之後通過調用setter方法設置進去的,是在RegistryProtocol.doRefer方法中完成的。而RegistryProtocol是通過ExtensionLoader機制載入的,會受到IOC影響,所以RegistryProtocol實例內部的protocol成員是通過ExtensionLoader的IOC機制自動註入的,是一個自適應的擴展類。
另外,InvokerDelegate只是個簡單的包裝類,不需要多說。
Invoker的創建最終還是通過protocol.refer方法,我們以最常用的dubbo協議為例進行分析。
DubboProtocol.refer
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
這個方法很簡單,直接new了一個DubboInvoker。
DubboInvoker
看一下doInvoke方法,這個方法主要是處理了同步,非同步,超時,單向調用等參數,並且對調用結果封裝了非同步調用,同步調用的邏輯。
真正執行遠程調用的部分是靠ExchangeClient實現的,再往下就是調用參數的序列化,tcp連接創建,發送報文,獲取響應報文,反序列化結果等的邏輯了,本文不再深入下去。