服務引入 服務引入使用reference標簽來對要引入的服務進行配置,包括服務的介面 ,名稱,init,check等等配置屬性。 在DubboNamespaceHandler中,我們可以看到reference標簽是通過引入一個ReferenceBean類型的bean實現的,那麼我們就以這個bean為 ...
服務引入
服務引入使用reference標簽來對要引入的服務進行配置,包括服務的介面 ,名稱,init,check等等配置屬性。
在DubboNamespaceHandler中,我們可以看到reference標簽是通過引入一個ReferenceBean類型的bean實現的,那麼我們就以這個bean為入口,一探dubbo服務引入的究竟。
ReferenceBean概述
首先看一下ReferenceBean的繼承結構:
- 繼承了ReferenceConfig,用於存放通過配置文件或api設置的一些配置,
- 實現了若幹介面,全部都與spring框架相關,關係到bean的生命周期以及對一些spring基礎設施類的感知,
- 實現FactoryBean。說明是一個工廠bean, 我們將介面作為依賴引入到其他bean中,或者直接調用ApplicationContext.getBean方法時,會通過這個工廠bean獲取一個實際類型的bean
容易想到,這個被引入的服務的引用非獲取應該與FactoryBean相關。 - ApplicationContextAware。Aware介面,目的是為了持有spring容器的引用,以便能夠獲取其他的依賴的bean
- InitializingBean。 在spring的bean被實例化後,會一次調用BeanPostProcessor.postProcessBeforeInitialization, InitializingBean.afterPropertiesSet, 自定義的初始化方法(通過init屬性配置),BeanPostProcessor.postProcessAfterInitialization,所以實現了InitializingBean介面的bean在實例化時,spring框架會自動調用afterPropertiesSet方法
- DisposableBean。 bean是一個有聲明周期的實體,在spring容器關閉時會自動銷毀這個bean
afterPropertiesSet
這個方法主要是做一些配置,比如初始化配置中心bean,消費者配置類ConsumerConfig,全局配置類ApplicationConfig,等等,還有一些其他的配置,大致與服務導出的過程差不多。
FactoryBean.getObject
很顯然服務引入的入口就在這個方法中。
兜兜轉轉,期間經過幾個方法調用,忽略中間涉及到的配置部分,我們來到核心方法init
ReferenceConfig.init
public synchronized void destroy() {
if (ref == null) {
return;
}
if (destroyed) {
return;
}
destroyed = true;
try {
invoker.destroy();
} catch (Throwable t) {
logger.warn("Unexpected error occured when destroy invoker of ReferenceConfig(" + url + ").", t);
}
invoker = null;
ref = null;
}
private void init() {
// 用一個volatile變數標記是否已經初始化過
if (initialized) {
return;
}
// 這裡還是有可能多個線程同時初始化,不如學spring, 直接加鎖
initialized = true;
// 檢查stub和local合法性
checkStubAndLocal(interfaceClass);
// 檢查mock合法性
checkMock(interfaceClass);
// 存放參數
Map<String, String> map = new HashMap<String, String>();
// size屬性設為consumer,即消費端
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
// 添加運行時的幾個參數,之前在分析服務導出 的時候已經講過
// 1. dubbo協議的版本號
// 2. dubbo框架的發行版本號,可以通過package-info或者jar包名稱獲取
// 3. 時間戳
// 4. 當前jvm進程號
appendRuntimeParameters(map);
// 對於非泛化服務,添加如下配置
if (!isGeneric()) {
// 修訂版本號
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 添加介面名參數
map.put(Constants.INTERFACE_KEY, interfaceName);
// 接下來的幾個方法與服務導出中的處理過程類似,都是按照優先順序覆蓋配置
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
// 最後添加自身的參數配置,即reference標簽配置的參數,
// 顯然這些配置應該是優先順序最高的,所以最後添加以覆蓋之前的配置
appendParameters(map, this);
Map<String, Object> attributes = null;
if (CollectionUtils.isNotEmpty(methods)) {
attributes = new HashMap<String, Object>();
for (MethodConfig methodConfig : methods) {
appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 如果該方法被設置為不重試,那麼添加一個參數:方法名.retries=0
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
}
}
// 通過環境變數或jvm系統變數獲取屬性DUBBO_IP_TO_REGISTRY,即要發送給註冊中心的主機ip地址
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
// 如果從環境變數或jvm系統變數沒獲取到,那麼直接獲取本地ip
// 如果獲取不到本地ip,最後只有用環回地址
hostToRegistry = NetUtils.getLocalHost();
}
// 添加參數
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
// 關鍵一步,創建代理
ref = createProxy(map);
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
}
這個方法大致分為兩塊,前半部分都是在構建參數的map,最後用這些參數創建一個代理,
添加的參數包括運行時參數,版本號,方法名,按優先順序分別添加全局配置,分組配置,消費端配置,以及reference標簽的配置,用於註冊的ip
ReferenceConfig.createProxy
private T createProxy(Map<String, String> map) {
// 首先判斷是不是本地引用,
if (shouldJvmRefer(map)) {
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
// 創建一個本地服務引用,通過指定的injvm協議創建
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 用戶指定的url,可以是點對點調用,也可以指定註冊中心的url
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 可以是多個url,以分號(;)號分隔
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// refer是註冊中心url的參數key名稱
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
//
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
checkRegistry();
// 用戶指定的url,優先用指定的url
// 可以是點對點調用,也可以指定註冊中心的url
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
// 創建Invoker
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
// make it possible for consumer to retry later if provider is temporarily unavailable
initialized = false;
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
// 重要的一步,創建代理
return (T) proxyFactory.getProxy(invoker);
}
大致分為三種情況:
- 如果參數中指明瞭是本地引用,那麼使用InjvmProtocol創建一個本地的Invoker
- 如果用戶在指定了url,那麼優先用用戶顯式指定的url
- 如果沒有顯式配置的url,那麼就載入所有的註冊中心的url
載入完url之後,調用Protocol.refer方法創建一個服務引用,即一個Invoker,
我們分析最普通的情況,即通註冊中心引用服務的情況,這種情況是調用RegistryProtocol.refer方法創建Invoker
RegistryProtocol.refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
// registry屬性預設是dubbo
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
// 前面protocol屬性被設為registry,
// 而原本的protocol屬性被保存在registry屬性中
// 到這裡將protocol設為registry已經完成了他的使命,即將Protocol類型路由到RegistryProtocol中
// 所以這是自然要將protocol屬性設回原本的值,而將registry屬性丟棄
.removeParameter(REGISTRY_KEY)
.build();
// 這裡根據協議決定具體使用哪種Registry
// registryFactory成員屬性是通過ExtensionLoader的IOC機制自動註入的,也就是通過ExtensionFactory獲取到的
// 對於帶有SPI註解的介面,通過IOC方式註入的是自適應的擴展類
// 以常用的zookeeper註冊中心為例,這裡通過ZookeeperRegistryFactory獲取到了一個ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 創建Invoker
// 這裡的cluster成員屬性同樣也是通過ExtensionLoader的IOC自動註入的,
// 同樣註入的是一個自適應的Cluster
return doRefer(cluster, registry, type, url);
}
對url進行一些處理,然後獲取一個註冊服務Registry對象,一般常用的有ZookeeperRegistry。
接下來是對分組信息的處理,這裡由於不是很常用,我們暫時跳過。
RegistryProtocol.doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創建一個服務目錄
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 訂閱url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 註冊一個消費者
registry.register(directory.getRegisteredConsumerUrl());
}
// 創建路由鏈
directory.buildRouterChain(subscribeUrl);
// 向註冊中心訂閱,訂閱providers,configurators,routers三個目錄的服務
// 接收註冊中心的變化信息
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 將目錄封裝成一個Invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
這裡首先創建了一個服務目錄,然後向註冊中心註冊一個消費者,創建路由鏈,向註冊中心訂閱以接收服務變化的通知,
最關鍵的一步是cluster.join,這一步將服務目錄封裝成一個Invoker,我們知道從註冊中心是可以獲取多個服務提供者的。
- Directory,服務目錄,封裝了從註冊中心發現服務,並感知服務變化的邏輯
- Cluster,這個類實際上只起到過渡的作用,通過它的join方法返回FailoverClusterInvoker等對象,這些類封裝了服務調用過程中的故障轉移,重試,負載均衡等邏輯
這兩個介面會單獨在寫文章來分析,本文我們主要是為了理清服務引用的主幹邏輯。
ProxyFactory.getProxy
我們回到ReferenceConfig中,通過以上的一些步驟獲取到invoker之後,創建服務引用的過程並沒有結束。
試想,服務引入後,用戶是需要在代碼中直接調用服務介面中的方法的,而Invoker只有一個invoke方法,顯然,我們還需要一個代理,來使的方法調用對用戶是透明的,即用戶不需要感知到還有Invoker這個東西的存在。所以接下來就分析一下創建代理的過程。
ProxyFactory這個類在服務導出的部分已經接觸過。服務導出時,調用ProxyFactory.getInvoker方法獲取一個Invoker類,用於將發送過來的調用信息路由到介面的不同方法上。
而在服務引入的過程中,我們需要創建一個代理,將介面中的不同的方法調用轉換成Invoker的invoke調用,併進一步轉化為網路報文發送給服務提供者,並將返回的結果信息返回給服務調用者。
預設的ProxyFactory是JavassistProxyFactory,繼承自AbstractProxyFactory,我們先從AbstractProxyFactory看起
AbstractProxyFactory.getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
這個方法通過Proxy.getProxy生成一個Proxy類示例,然後調用Proxy實例的newInstance方法返回代理對象,我們重點分析一下Proxy.getProxy方法
Proxy.getProxy
這個方法就不貼代碼了,太長,大概的邏輯是生成兩個類的代碼,然後調用javassist庫編譯載入獲取Class對象,生成的這兩個類一個實現了用戶的服務介面的代理類,另一個繼承了Proxy,用於生成代理類的實例,對於這部分代碼,我認為逐字逐句第分析代碼生成部分的邏輯意義不大,不如直接看一下生成後的代碼長什麼樣子,這樣能夠更加直觀地理解代碼生成的邏輯。
示例介面:
public interface I2 {
void setName(String name);
void hello(String name);
int showInt(int v);
float getFloat();
void setFloat(float f);
}
生成的代理類代碼:
public class proxy0 implements org.apache.dubbo.common.bytecode.I2 {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public float getFloat() {
Object[] args = new Object[0];
Object ret = handler.invoke(this, methods[0], args);
return ret == null ? (float) 0 : ((Float) ret).floatValue();
}
public void setName(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[1], args);
}
public void setFloat(float arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[2], args);
}
public void hello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[3], args);
}
public int showInt(int arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[4], args);
return ret == null ? (int) 0 : ((Integer) ret).intValue();
}
}
生成的Proxy類代碼:
public class Proxy0 extends org.apache.dubbo.common.bytecode.Proxy {
public Object newInstance(java.lang.reflect.InvocationHandler h) {
return new org.apache.dubbo.common.bytecode.proxy0($1);
}
}
當然了,上面的代碼只是初步的代碼,後面肯定要經過一定的處理才能編譯,不過這都是javassist庫的事情,通過上面生成的代碼我們很容易就知道dubbo生成動態代理的邏輯。
從生成的代理類代碼可以看出來,代理類緩存了介面的所有方法的Method對象,放到一個數組中,數組下標和方法是嚴格對應的,這樣做的好處是不需要每次調用方法的時候都通過反射去獲取Method對象,那樣效率太低。代理類調用每個方法的邏輯其實都是一樣的,都是調用了InvocationHandler.invoke方法,生成的這個代理類感覺就像是一個門面,唯一的作用就是把所有的方法調用導向invoke調用,並傳遞參數。
InvokerInvocationHandler.invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(createInvocation(method, args)).recreate();
}
這個方法的邏輯也很簡單,直接調用的Invoker.invoke方法,而Invoker對象是通過構造方法傳進來的。所以核心的處理邏輯還是在Invoker對象中,其他的基本都是傳參,方法調用的作用。
至於createInvocation方法的邏輯就更簡單了,就是把方法名,參數類型列表,調用參數等取出來,然後封裝成一個RpcInvocation對象,然後用這個RpcInvocation對象作為參數調用Invoker.invoke方法。
那麼Invoker對象又是怎麼來的呢?是通過服務目錄也就是Directory對象內部生成的,服務目錄會監聽註冊中心,並獲取服務提供者的信息,然後生成代表這些服務提供者的Invoker對象,並通過Cluster對象將多個Invoker對象封裝在一起,內部實現故障轉移,服務路由,負載均衡等邏輯。服務目錄,集群,以及負載均衡的內容都比較多,而且模塊獨立性較強,所以可以分開來看這些模塊的代碼。
總結
這一節的主要內容是服務引用。服務引用的入口是spring配置文件中的reference標簽,這個標簽由ReferenceBean處理,ReferenceBean是一個FactoryBean,通過它的getObject方法獲取引用,經過一些調用鏈,最終生成服務介面引用的核心邏輯在ReferenceConfig.init方法中。這個方法的邏輯大致分為三個部分:
- 參數處理。init方法的大部分代碼都是在進行參數的處理,包括一些緩存的邏輯,狀態判斷,合法性檢查等等。
- 列出所有的url,包括顯示指定的url, 註冊中心url,通過Protocol介面的refer方法創建Invoker對象,創建出來的Invoker對象已經是經過Cluster對象封裝了故障轉移,服務路由,負載均衡邏輯的了。
Invoker對象最主要的功能實際上是封裝了通信細節,包括調用參數和返回結果的序列化反序列化,創建TCP連接,發送報文等邏輯。 使用上面生成的Invoker對象生成一個服務介面的代理類,生成的這個代理類負責將對介面方法的調用轉化為調用內部的Invoker對象的invoke方法的調用。
而生成代理類的邏輯封裝在ProxyFactory介面中,預設使用javassist生成動態代理,但是代理類代碼生成的邏輯仍然是dubbo自己實現,只是用javassist庫進行代碼編譯載入。dubbo在生成動態代理是做了一些比較重要的優化:
- 將被代理的介面的所有方法的Method對象緩存起來,存放到一個數組中,並將方法與數組下標對應起來,這樣在方法調用時可以很快獲取到Method對象,而不用通過反射再獲取一遍Method對象,方法調用的效率大大提升。(PS: 這裡我最初的理解錯了,實際上jdk動態代理也是差不多的套路,將各個方法的Method對象在類載入是就緩存起來,每次方法調用時不需要再次通過反射獲取Methodd對象。)
所以問題是:dubbo實現的動態代理和jdk實現的動態代理有什麼區別?dubbo為啥要自己實現??