一、前言 前面講了服務是如何導出到註冊中心的。其實Dubbo做的一件事就是將服務的URL發佈到註冊中心上。那現在我們聊一聊消費者一方如何從註冊中心訂閱服務併進行遠程調用的。 二、引用服務時序圖 首先總的來用文字說一遍內部的大致機制 Actor:可以當做我們的消費者。當我們使用@Reference註解 ...
一、前言
前面講了服務是如何導出到註冊中心的。其實Dubbo做的一件事就是將服務的URL發佈到註冊中心上。那現在我們聊一聊消費者一方如何從註冊中心訂閱服務併進行遠程調用的。
二、引用服務時序圖
首先總的來用文字說一遍內部的大致機制
Actor:可以當做我們的消費者。當我們使用@Reference註解將對應服務註入到其他類中這時候Spring會第一時間調用getObject方法,而getObject中只有一個方法就是get()。這裡可以理解為消費者開始引入服務了。
餓漢式:在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務。
懶漢式:在 ReferenceBean 對應的服務被註入到其他類中時引用。Dubbo預設使用懶漢式。
ReferenceConfig:通過get方法其實是進入到ReferenceConfig類中執行init()方法。在這個方法里主要做了下麵幾件事情:
1,、對@Reference標註的介面查看是否合法,檢查該介面是不是存在泛型
2、在系統中拿到dubbo.resolve.file這個文件,這個文件是進行配置consumer的介面的。將配置好的consumer信息存到URL中
3、將配置好的ApplicationConfig、ConsumerConfig、ReferenceConfig、MethodConfig,以及消費者的IP地址存到系統的上下文中
4、接下來開始創建代理對象進入到ReferenceConfig的createProxy 。這裡還是在ReferenceConfig類中。上面的那些配置統統傳入該方法中。上面有提到resolve解析consumer為URL,現在就根據這個URL首先判斷是否遠程調用還是本地調用。
4.1若是本地調用,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例
4.2若是遠程調用,則讀取直連配置項,或註冊中心 url,並將讀取到的 url 存儲到 urls 中。然後根據 urls 元素數量進行後續操作。若 urls 元素數量為1,則直接通過 Protocol 自適應拓展類即RegistryProtocol類或者DubboProtocol構建 Invoker 實例介面,這得看URL前面的是registry://開頭還是以dubbo://。若 urls 元素數量大於1,即存在多個註冊中心或服務直連 url,此時先根據 url 構建 Invoker。然後再通過 Cluster 合併即merge多個 Invoker,最後調用 ProxyFactory 生成代理類。
RegistryProtocol:在refer方法中首先為 url 設置協議頭,然後根據 url 參數載入註冊中心實例。然後獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。doRefer 方法創建一個 RegistryDirectory 實例,然後生成服務消費者鏈接,通過registry.register方法向註冊中心註冊消費者的鏈接,然後通過directory.subscribe向註冊中心訂閱 providers、configurators、routers 等節點下的數據。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點信息。由於一個服務可能部署在多台伺服器上,這樣就會在 providers 產生多個節點,這個時候就需要 Cluster 將多個服務節點合併為一個,並生成一個 Invoker。同樣Invoker創建過程先不分析,後面會拿一章專門介紹。
ProxyFactory:Invoker 創建完畢後,接下來要做的事情是為服務介面生成代理對象。有了代理對象,即可進行遠程調用。代理對象生成的入口方法為的getProxy。獲取需要創建的介面列表,組合成數組。而後將該介面數組傳入 Proxy 的 getProxy 方法獲取 Proxy 子類,然後創建 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現 JDK 的 InvocationHandler 介面,具體的用途是攔截介面類調用。可以理解為AOP或攔截器。也就是在獲取該對象之前會調用到Proxy實例而不會調用到服務提供者對應的類。至於如何創建proxy實例,請看後面源碼的註釋。
三、Dubbo源碼
服務引用入口源碼ReferenceBean的getObject方法:
1 public Object getObject() throws Exception { 2 return get(); 3 } 4 5 public synchronized T get() { 6 if (destroyed) { 7 throw new IllegalStateException("Already destroyed!"); 8 } 9 // 檢測 ref 是否為空,為空則通過 init 方法創建 10 if (ref == null) { 11 // init 方法主要用於處理配置,以及調用 createProxy 生成代理類 12 init(); 13 } 14 return ref; 15 }View Code
ReferenceConfig 的 init 進行消費者一方的配置:
對源碼進行了分割,方便理清邏輯
1 private void init() { 2 // 避免重覆初始化 3 if (initialized) { 4 return; 5 } 6 initialized = true; 7 // 檢測介面名合法性 8 if (interfaceName == null || interfaceName.length() == 0) { 9 throw new IllegalStateException("interface not allow null!"); 10 } 11 12 // 檢測 consumer 變數是否為空,為空則創建 13 checkDefault(); 14 appendProperties(this); 15 if (getGeneric() == null && getConsumer() != null) { 16 // 設置 generic 17 setGeneric(getConsumer().getGeneric()); 18 } 19 20 // 檢測是否為泛化介面 21 if (ProtocolUtils.isGeneric(getGeneric())) { 22 interfaceClass = GenericService.class; 23 } else { 24 try { 25 // 載入類 26 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() 27 .getContextClassLoader()); 28 } catch (ClassNotFoundException e) { 29 throw new IllegalStateException(e.getMessage(), e); 30 } 31 checkInterfaceAndMethods(interfaceClass, methods); 32 } 33 34 // -------------------------------分割線1------------------------------ 35 36 // 從系統變數中獲取與介面名對應的屬性值 37 String resolve = System.getProperty(interfaceName); 38 String resolveFile = null; 39 if (resolve == null || resolve.length() == 0) { 40 // 從系統屬性中獲取解析文件路徑 41 resolveFile = System.getProperty("dubbo.resolve.file"); 42 if (resolveFile == null || resolveFile.length() == 0) { 43 // 從指定位置載入配置文件 44 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); 45 if (userResolveFile.exists()) { 46 // 獲取文件絕對路徑 47 resolveFile = userResolveFile.getAbsolutePath(); 48 } 49 } 50 if (resolveFile != null && resolveFile.length() > 0) { 51 Properties properties = new Properties(); 52 FileInputStream fis = null; 53 try { 54 fis = new FileInputStream(new File(resolveFile)); 55 // 從文件中載入配置 56 properties.load(fis); 57 } catch (IOException e) { 58 throw new IllegalStateException("Unload ..., cause:..."); 59 } finally { 60 try { 61 if (null != fis) fis.close(); 62 } catch (IOException e) { 63 logger.warn(e.getMessage(), e); 64 } 65 } 66 // 獲取與介面名對應的配置 67 resolve = properties.getProperty(interfaceName); 68 } 69 } 70 if (resolve != null && resolve.length() > 0) { 71 // 將 resolve 賦值給 url 72 url = resolve; 73 } 74 75 // -------------------------------分割線2------------------------------ 76 if (consumer != null) { 77 if (application == null) { 78 // 從 consumer 中獲取 Application 實例,下同 79 application = consumer.getApplication(); 80 } 81 if (module == null) { 82 module = consumer.getModule(); 83 } 84 if (registries == null) { 85 registries = consumer.getRegistries(); 86 } 87 if (monitor == null) { 88 monitor = consumer.getMonitor(); 89 } 90 } 91 if (module != null) { 92 if (registries == null) { 93 registries = module.getRegistries(); 94 } 95 if (monitor == null) { 96 monitor = module.getMonitor(); 97 } 98 } 99 if (application != null) { 100 if (registries == null) { 101 registries = application.getRegistries(); 102 } 103 if (monitor == null) { 104 monitor = application.getMonitor(); 105 } 106 } 107 108 // 檢測 Application 合法性 109 checkApplication(); 110 // 檢測本地存根配置合法性 111 checkStubAndMock(interfaceClass); 112 113 // -------------------------------分割線3------------------------------ 114 115 Map<String, String> map = new HashMap<String, String>(); 116 Map<Object, Object> attributes = new HashMap<Object, Object>(); 117 118 // 添加 side、協議版本信息、時間戳和進程號等信息到 map 中 119 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); 120 map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); 121 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 122 if (ConfigUtils.getPid() > 0) { 123 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 124 } 125 126 // 非泛化服務 127 if (!isGeneric()) { 128 // 獲取版本 129 String revision = Version.getVersion(interfaceClass, version); 130 if (revision != null && revision.length() > 0) { 131 map.put("revision", revision); 132 } 133 134 // 獲取介面方法列表,並添加到 map 中 135 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 136 if (methods.length == 0) { 137 map.put("methods", Constants.ANY_VALUE); 138 } else { 139 map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 140 } 141 } 142 map.put(Constants.INTERFACE_KEY, interfaceName); 143 // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的欄位信息添加到 map 中 144 appendParameters(map, application); 145 appendParameters(map, module); 146 appendParameters(map, consumer, Constants.DEFAULT_KEY); 147 appendParameters(map, this); 148 149 // -------------------------------分割線4------------------------------ 150 151 String prefix = StringUtils.getServiceKey(map); 152 if (methods != null && !methods.isEmpty()) { 153 // 遍歷 MethodConfig 列表 154 for (MethodConfig method : methods) { 155 appendParameters(map, method, method.getName()); 156 String retryKey = method.getName() + ".retry"; 157 // 檢測 map 是否包含 methodName.retry 158 if (map.containsKey(retryKey)) { 159 String retryValue = map.remove(retryKey); 160 if ("false".equals(retryValue)) { 161 // 添加重試次數配置 methodName.retries 162 map.put(method.getName() + ".retries", "0"); 163 } 164 } 165 166 // 添加 MethodConfig 中的“屬性”欄位到 attributes 167 // 比如 onreturn、onthrow、oninvoke 等 168 appendAttributes(attributes, method, prefix + "." + method.getName()); 169 checkAndConvertImplicitConfig(method, map, attributes); 170 } 171 } 172 173 // -------------------------------✨ 分割線5 ✨------------------------------ 174 175 // 獲取服務消費者 ip 地址 176 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); 177 if (hostToRegistry == null || hostToRegistry.length() == 0) { 178 hostToRegistry = NetUtils.getLocalHost(); 179 } else if (isInvalidLocalHost(hostToRegistry)) { 180 throw new IllegalArgumentException("Specified invalid registry ip from property..." ); 181 } 182 map.put(Constants.REGISTER_IP_KEY, hostToRegistry); 183 184 // 存儲 attributes 到系統上下文中 185 StaticContext.getSystemContext().putAll(attributes); 186 187 // 創建代理類 188 ref = createProxy(map); 189 190 // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel, 191 // 並將 ConsumerModel 存入到 ApplicationModel 中 192 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); 193 ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); 194 }View Code
ReferenceConfig 的 createProxy 創建代理對象:
但是不是在這個方法內創建proxy實例,而是對URL進行解析後分三種創建Invoker線路,包括InjvmProtocol中的refer、DubboProtocol的refer與RegistryProtocol中的refer,最後再調用ProxyFactory來對proxy實例進行創建:
1 private T createProxy(Map<String, String> map) { 2 URL tmpUrl = new URL("temp", "localhost", 0, map); 3 final boolean isJvmRefer; 4 if (isInjvm() == null) { 5 // url 配置被指定,則不做本地引用 6 if (url != null && url.length() > 0) { 7 isJvmRefer = false; 8 // 根據 url 的協議、scope 以及 injvm 等參數檢測是否需要本地引用 9 // 比如如果用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true 10 } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { 11 isJvmRefer = true; 12 } else { 13 isJvmRefer = false; 14 } 15 } else { 16 // 獲取 injvm 配置值 17 isJvmRefer = isInjvm().booleanValue(); 18 } 19 20 // 本地引用 21 if (isJvmRefer) { 22 // 生成本地引用 URL,協議為 injvm 23 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); 24 // 調用 refer 方法構建 InjvmInvoker 實例 25 invoker = refprotocol.refer(interfaceClass, url); 26 27 // 遠程引用 28 } else { 29 // url 不為空,表明用戶可能想進行點對點調用 30 if (url != null && url.length() > 0) { 31 // 當需要配置多個 url 時,可用分號進行分割,這裡會進行切分 32 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); 33 if (us != null && us.length > 0) { 34 for (String u : us) { 35 URL url = URL.valueOf(u); 36 if (url.getPath() == null || url.getPath().length() == 0) { 37 // 設置介面全限定名為 url 路徑 38 url = url.setPath(interfaceName); 39 } 40 41 // 檢測 url 協議是否為 registry,若是,表明用戶想使用指定的註冊中心 42 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 43 // 將 map 轉換為查詢字元串,並作為 refer 參數的值添加到 url 中 44 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 45 } else { 46 // 合併 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性), 47 // 比如線程池相關配置。並保留服務提供者的部分配置,比如版本,group,時間戳等 48 // 最後將合併後的配置設置為 url 查詢字元串中。 49 urls.add(ClusterUtils.mergeUrl(url, map)); 50 } 51 } 52 } 53 } else { 54 // 載入註冊中心 url 55 List<URL> us = loadRegistries(false); 56 if (us != null && !us.isEmpty()) { 57 for (URL u : us) { 58 URL monitorUrl = loadMonitor(u); 59 if (monitorUrl != null) { 60 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 61 } 62 // 添加 refer 參數到 url 中,並將 url 添加到 urls 中 63 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 64 } 65 } 66 67 // 未配置註冊中心,拋出異常 68 if (urls.isEmpty()) { 69 throw new IllegalStateException("No such any registry to reference..."); 70 } 71 } 72 73 // 單個註冊中心或服務提供者(服務直連,下同) 74 if (urls.size() == 1) { 75 // 調用 RegistryProtocol 的 refer 構建 Invoker 實例 76 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 77 78 // 多個註冊中心或多個服務提供者,或者兩者混合 79 } else { 80 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 81 URL registryURL = null; 82 83 // 獲取所有的 Invoker 84 for (URL url : urls) { 85 // 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時 86 // 根據 url 協議頭載入指定的 Protocol 實例,並調用實例的 refer 方法 87 invokers.add(refprotocol.refer(interfaceClass, url)); 88 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 89 registryURL = url; 90 } 91 } 92 if (registryURL != null) { 93 // 如果註冊中心鏈接不為空,則將使用 AvailableCluster 94 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 95 // 創建 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併 96 invoker = cluster.join(new StaticDirectory(u, invokers)); 97 } else { 98 invoker = cluster.join(new StaticDirectory(invokers)); 99 } 100 } 101 } 102 103 Boolean c = check; 104 if (c == null && consumer != null) { 105 c = consumer.isCheck(); 106 } 107 if (c == null) { 108 c = true; 109 } 110 111 // invoker 可用性檢查 112 if (c && !invoker.isAvailable()) { 113 throw new IllegalStateException("No provider available for the service..."); 114 } 115 116 // 生成代理類 117 return (T) proxyFactory.getProxy(invoker); 118 }View Code
同樣Invoker的創建後面會專門拿一篇來講。暫時先把Invoker創建看成一個黑盒,只要我們調用即可。
RegistryProtocol中的refer:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 // 取 registry 參數值,並將其設置為協議頭 3 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 4 // 獲取註冊中心實例 5 Registry registry = registryFactory.getRegistry(url); 6 if (RegistryService.class.equals(type)) { 7 return proxyFactory.getInvoker((T) registry, type, url); 8 } 9 10 // 將 url 查詢字元串轉為 Map 11 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 12 // 獲取 group 配置 13 String group = qs.get(Constants.GROUP_KEY); 14 if (group != null && group.length() > 0) { 15 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 16 || "*".equals(group)) { 17 // 通過 SPI 載入 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯 18 return doRefer(getMergeableCluster(), registry, type, url); 19 } 20 } 21 22 // 調用 doRefer 繼續執行服務引用邏輯 23 return doRefer(cluster, registry, type, url); 24 } 25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 26 // 創建 RegistryDirectory 實例 27 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 28 // 設置註冊中心和協議 29 directory.setRegistry(registry); 30 directory.setProtocol(protocol); 31 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 32 // 生成服務消費者鏈接 33 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 34 35 // 註冊服務消費者,在 consumers 目錄下新節點 36 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 37 && url.getParameter(Constants.REGISTER_KEY, true)) { 38 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 39 Constants.CHECK_KEY, String.valueOf(false))); 40 } 41 42 // 訂閱 providers、configurators、routers 等節點數據 43 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 44 Constants.PROVIDERS_CATEGORY 45 + "," + Constants.CONFIGURATORS_CATEGORY 46 + "," + Constants.ROUTERS_CATEGORY)); 47 48 // 一個註冊中心可能有多個服務提供者,因此這裡需要將多個服務提供者合併為一個 49 Invoker invoker = cluster.join(directory); 50 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); 51 return invoker; 52 }View Code
在Invoker創建完後會返回到ReferenceConfig中,然後進入ProxyFactory中的getProxy方法。
ProxyFactory中的getProxy方法:
1 public <T> T getProxy(Invoker<T> invoker) throws RpcException { 2 // 調用重載方法 3 return getProxy(invoker, false); 4 } 5 6 public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { 7 Class<?>[] interfaces = null; 8 // 獲取介面列表 9 String config = invoker.getUrl().getParameter("interfaces"); 10 if (config != null && config.length() > 0) {