前言 本文基於Dubbo2.6.x版本,中文註釋版源碼已上傳github:xiaoguyu/dubbo 上一篇文章,講了Dubbo的服務導出: Dubbo源碼(三) - 服務導出(生產者) 本文,咱們來聊聊Dubbo的服務引用。 本文案例來自Dubbo官方Demo,路徑為: dubbo/dubbo- ...
前言
本文基於Dubbo2.6.x版本,中文註釋版源碼已上傳github:xiaoguyu/dubbo
上一篇文章,講了Dubbo的服務導出:
本文,咱們來聊聊Dubbo的服務引用。
本文案例來自Dubbo官方Demo,路徑為:
dubbo/dubbo-demo/dubbo-demo-consumer/
服務引用原理
Dubbo服務引用對象的生成,是在ReferenceBean#getObject()
方法中
其生成時機有兩個:
-
餓漢式
ReferenceBean
對象繼承了InitializingBean
介面public void afterPropertiesSet() throws Exception { ...... Boolean b = isInit(); if (b == null && getConsumer() != null) { b = getConsumer().isInit(); } if (b != null && b.booleanValue()) { getObject(); } }
從代碼可以看出,需要開啟init屬性
-
懶漢式
因為
ReferenceBean
繼承了FactoryBean
介面,當服務被註入到其他類中時,Spring會調用getObject方法
而服務的調用方式分三種:
- 本地引用
- 直連方式引用
- 註冊中心引用
不管是哪種引用方式,最後都會得到一個 Invoker 實例。
我們再次看看Invoker的官方解釋:
Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起 invoke 調用,它有可能是一個本地的實現,也可能是一個遠程的實現,也可能一個集群實現。
在Dubbo中,Invoker是多重套娃的(可以理解為裝飾器模式或者包裝增強類),通過一層層的包裝,使普通的Invoker具備了負載均衡、集群的功能。
最後,為服務介面(本文為DemoService)生成代理對象,Invoker#invoke(Invocation invocation)實現服務的調用。
本文不討論直連方式引用,也不討論負載均衡、集群等功能(後續再開坑說)。
創建代理對象
夢的開始,ReferenceBean#getObject()
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
// init 方法主要用於處理配置,以及調用 createProxy 生成代理類
init();
}
return ref;
}
很明顯,在 init 方法中生成了ref
private void init() {
// 省略大堆的檢查以及參數處理
......
//attributes are stored by system context.
// 存儲 attributes 到系統上下文中
StaticContext.getSystemContext().putAll(attributes);
// 創建代理類
ref = createProxy(map);
// 根據服務名,ReferenceConfig,代理類構建 ConsumerModel,
// 並將 ConsumerModel 存入到 ApplicationModel 中
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
直接看 createProxy(map)
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
// isJvmRefer 的賦值處理
......
// 本地引用
if (isJvmRefer) {
// 生成invoker
......
// 遠程引用
} else {
// 生成invoker、合併invoker
......
}
// invoker 可用性檢查
......
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
這個方法主要做了兩件事
- 創建以及合併Invoker
- 生成代理對象
這裡先略過invoker的處理,先看看代理對象的生成。
proxyFactory 是自適應拓展類,預設實現是JavassistProxyFactory
,getProxy 方法在其父類AbstractProxyFactory
中
// 這是AbstractProxyFactory類的方法
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return getProxy(invoker, false);
}
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 獲取介面列表
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 切分介面列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 設置服務介面類和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 載入介面類
interfaces[i + 2] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
// 為 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 創建新的 interfaces 數組
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 設置 GenericService.class 到數組中
interfaces[len] = GenericService.class;
}
// 調用重載方法
return getProxy(invoker, interfaces);
}
這裡的大段邏輯都是在處理interfaces
參數,此時interfaces
的值為{ DemoService.class, EchoService.class }
繼續看子類JavassistProxyFactory
實現的 getProxy(invoker, interfaces) 方法
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
// 源碼是上面那行,我們將上面的代碼改下麵的格式
Proxy proxy = Proxy.getProxy(interfaces);
return (T) proxy.newInstance(new InvokerInvocationHandler(invoker));
}
註意:我下麵開始講的 proxy 不是平時理解的代理對象,你可以理解為一個生成代理對象的 builder
此方法做了兩件事:
- 生成
proxy
對象 - 調用 proxy 的
newInstance
方法生成實際的代理對象
這裡,我就不講 Proxy.getProxy 的源碼了,感興趣的朋友自行瞭解。簡單講下裡面做了什麼:
-
構建服務介面(本文為DemoService)的代理類的位元組碼對象,其生成的位元組碼對象如下:
這裡簡化了下代碼,實際上還實現了EchoService介面
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
-
構建生成服務介面代理對象的builder
package com.alibaba.dubbo.common.bytecode; public class Proxy0 extends com.alibaba.dubbo.common.bytecode.Proxy { public Proxy0() { } public Object newInstance(java.lang.reflect.InvocationHandler h){ return new com.alibaba.dubbo.common.bytecode.proxy0($1); } }
註意一下:一個是proxy0,另一個是Proxy0,包名不同,類名的p子也有大小寫的區別,別搞混了
再對照之前的 getProxy 方法
Proxy.getProxy(interfaces) 生成的是 Proxy0(大寫的P)
proxy.newInstance(new InvokerInvocationHandler(invoker)) 生成的是 proxy0(小寫的p)
至此,Dubbo服務引用對象已生成,可以看到,生成的引用對象結構也很簡單,主要是依賴 invoker 對象完成介面調用的,下麵就去看看 invoker 的生成。
創建Invoker
讓我們的視線重新回到createProxy
方法中
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
// isJvmRefer 的賦值處理
......
// 本地引用
if (isJvmRefer) {
// 生成本地引用 URL,協議為 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 調用 refer 方法構建 InjvmInvoker 實例
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
// 遠程引用
} else {
// url 不為空,表明用戶可能想進行點對點調用
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
......
} else { // assemble URL from register center's configuration
// 載入註冊中心 url
......
}
// 單個註冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
// 調用 RegistryProtocol 的 refer 構建 Invoker 實例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個註冊中心或多個服務提供者,或者兩者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取所有的 Invoker
for (URL url : urls) {
// 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時
// 根據 url 協議頭載入指定的 Protocol 實例,並調用實例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// 如果註冊中心鏈接不為空,則將使用 AvailableCluster
URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 創建 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// invoker 可用性檢查
......
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
本地引用
if (isJvmRefer) {
// 生成本地引用 URL,協議為 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 調用 refer 方法構建 InjvmInvoker 實例
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
refprotocol 是自適應拓展,根據URL中的協議,確定實現類是InjvmProtocol
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
其 refer 方法也很簡單,就生成了 InjvmInvoker 對象並返回。其實這裡搭配服務調用過程才容易理解(也就是InjvmInvoker#doInvoke(Invocation invocation方法),但本文是將服務引用過程,所以不展開。
遠程引用
遠程引用區分單註冊中心或單服務提供者和多註冊中心或多服務提供者,此處我們以單註冊中心或單服務提供者舉例,主要邏輯是下麵這段
// 單個註冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
// 調用 RegistryProtocol 的 refer 構建 Invoker 實例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
refprotocol 是自適應拓展類,根據 url 中的協議參數,其實現類為RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 獲取註冊中心實例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 將 url 查詢字元串轉為 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 獲取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通過 SPI 載入 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 調用 doRefer 繼續執行服務引用邏輯
return doRefer(cluster, registry, type, url);
}
獲取註冊中心實例的過程,就是創建 zookeeper 連接,我在上一篇Dubbo服務導出博文中講過了,請自行查找。
我們繼續關註主要方法doRefer(cluster, registry, type, url)
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創建 RegistryDirectory 實例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設置註冊中心和協議
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服務消費者鏈接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 註冊服務消費者,在 consumers 目錄下新節點
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
registry.register(registeredConsumerUrl);
directory.setRegisteredConsumerUrl(registeredConsumerUrl);
}
// 訂閱 providers、configurators、routers 等節點數據
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一個註冊中心可能有多個服務提供者,因此這裡需要將多個服務提供者合併為一個
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
此方法主要做了4個操作:
-
創建一個 RegistryDirectory 實例,這是一個服務目錄對象。
服務目錄中存儲了一些和服務提供者有關的信息,通過服務目錄,服務消費者可獲取到服務提供者的信息,比如 ip、埠、服務協議等。通過這些信息,服務消費者就可通過 Netty 等客戶端進行遠程調用。
-
向註冊中心進行註冊
-
訂閱 providers、configurators、routers 等節點下的數據
-
生成invoker
cluster.join(directory) 預設實現類是
FailoverCluster
,這個是集群處理,後續文章再討論。
討論了這麼久,還沒看到如何連接暴露出來的遠程服務。
其實,連接遠程服務的操作,就是在訂閱 providers 節點數據時完成的
連接遠程服務
這裡,就不細說訂閱 providers 之後的各種處理,直接快進到遠程服務的連接。下麵放上訂閱節點數據到啟動遠程連接的調用路徑
別問為什麼是DubboProtocol
,因為服務導出時,也就會zookeeper的providers節點中註冊的url,就是Dubbo協議
下麵來看看DubboProtocol
的 refer 方法
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// 序列化優化處理
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
此方法創建了 DubboInvoker 並返回,但是 DubboInvoker 的構造方法沒啥好說的,就是一些類變數的賦值。我們主要關註 getClients ,其返回的是客戶端實例
private ExchangeClient[] getClients(URL url) {
// whether to share connection
// 是否共用連接
boolean service_share_connect = false;
// 獲取連接數,預設為0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,則共用連接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 獲取共用客戶端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客戶端
clients[i] = initClient(url);
}
}
return clients;
}
connections 的預設值為0,也就是 service_share_connect 為 true ,進入 getSharedClient(url) 方法
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
// 獲取帶有“引用計數”功能的 ExchangeClient
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
// 增加引用計數
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
// 創建 ExchangeClient 客戶端
ExchangeClient exchangeClient = initClient(url);
// 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這裡使用了裝飾模式
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}
此處就是一些引用計數和緩存操作,主要關註 ExchangeClient 的創建
private ExchangeClient initClient(URL url) {
// 獲取客戶端類型,預設為 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
......
ExchangeClient client;
try {
// 獲取 lazy 配置,並根據配置值決定創建的客戶端類型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 創建懶載入 ExchangeClient 實例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 創建普通 ExchangeClient 實例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
我們這裡不討論懶載入的情況。有見到了熟悉的 Exchangers, 在服務導出的時候,調用的是Exchangers.bind 方法,服務引用這裡用的是 Exchangers.connect
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger 實例,預設為 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
這裡 getExchanger(url) 返回的是 HeaderExchangeClient,直接進去看 connect 方法
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 這裡包含了多個調用,分別如下:
// 1. 創建 HeaderExchangeHandler 對象
// 2. 創建 DecodeHandler 對象
// 3. 通過 Transporters 構建 Client 實例
// 4. 創建 HeaderExchangeClient 對象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
HeaderExchangeClient內部持有 client ,並封裝了心跳的功能。我們重點在 Transporters.connect ,也就是Dubbo的網路傳輸層是如何連接的
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 數量大於1,則創建一個 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例
return getTransporter().connect(url, handler);
}
getTransporter() 獲取的是Transporter
的自適應拓展類,預設是NettyTransporter
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
NettyTransporter
的 connect 方法就創建了一個 NettyClient 對象,所以啟動連接的相關邏輯在其構造函數中
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
// NettyClient的父類AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
......
try {
doOpen();
} catch (Throwable t) {
......
}
try {
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
......
}
......
}
這裡又是使用模板方法,doOpen() 和 connect() 的具體實現在子類NettyClient
中,其作用就是創建對遠程服務的連接。這部分屬於Netty的API調用,就不做具體描述了。
總結
本文講述了Dubbo服務導出的過程,也就是創建服務介面代理對象的過程。其中服務調用、集群、負載均衡等部分並未描述,可以期待後續文章。