Dubbo provider啟動原理: 當我們的dubbo啟動我們的spring容器時spring 初始化容器的時候會查找META-INF/spring.handles文件查找對應的NamespaceHandle,dubbo在其jar包下配置了DubboNamespaceHandle,該類下有以下配 ...
Dubbo provider啟動原理:
當我們的dubbo啟動我們的spring容器時spring 初始化容器的時候會查找META-INF/spring.handles文件查找對應的NamespaceHandle,dubbo在其jar包下配置了DubboNamespaceHandle,該類下有以下配置項:
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true)); registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
意思就是當spring 在解析容器的時候遇到指定配置會使用對應的Parser去解析配置項。
provider
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware
該類主要實現了上面幾個介面,我們來看其中最主要的InitializingBean,該類會在類實例化後調用其中的afterPropertiesSet方法 ,所以我們來看下:
public void afterPropertiesSet() throws Exception { ... //上面一大堆代碼都是判空去重新賦值的代碼,我們不關註他們,最主要是下麵這個export方法 if (!supportedApplicationListener) { export(); } }
public synchronized void export() { checkAndUpdateSubConfigs(); if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } //當設置了延時發佈時用定時器延時發佈 if (delay != null && delay > 0) { delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS); } else { //否則的話直接發佈 doExport(); } }
export方法最主要是判斷是有配置了延時發佈,是的話就用schedule去延時發佈,否的話doExport發佈,在spring中真正幹活的都是do開頭的方法,我們再繼續查看doExport方法
1 protected synchronized void doExport() { 2 if (unexported) { 3 throw new IllegalStateException("Already unexported!"); 4 } 5 if (exported) { 6 return; 7 } 8 exported = true; 9 10 if (path == null || path.length() == 0) { 11 path = interfaceName; 12 } 13 //生成唯一serviceName group/interfaceClass 如group/com.xx.xxx 14 //ref 介面實現bean 服務的真正提供者 15 //interfaceClass 需要發佈的介面服務 16 ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass); 17 ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); 18 //發佈url 19 doExportUrls(); 20 }
這裡主要查看doExportUrls()方法,上面的是把服務信息存到本地map里:
@SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { //這裡會獲取到註冊中心的列表如有配置多個的話, // 格式:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service&dubbo=2.0.2&pid=16232®istry=zookeeper&release=2.7.0×tamp=1553883173387 List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
所以這裡說明dubbo是支持多協議的多註冊中心的,提前預告,下麵這個doExportUrlsFor1Protocol方法會很長很複雜:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //獲取協議的名稱如dubbo String name = protocolConfig.getName(); if (name == null || name.length() == 0) { //預設是dubbo name = Constants.DUBBO; } Map<String, String> map = new HashMap<String, String>(); //組裝map 再填充到url上 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); //這裡省略了一堆設置url的代碼,主要是把介面的配置方法加到參數列表裡如果method 重試次數 .... if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } //是否injvm也就是本地發佈不上註冊中心 if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } //獲取綁定的有效IP地址 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); //獲得一個綁定埠 Integer port = this.findConfigedPorts(protocolConfig, name, map); //創建一個url //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service // &bean.name=com.lin.service.UserService&bind.ip=192.168.1.2&bind.port=20882&dubbo=2.0.2 // &generic=false&group=userGroup&interface=com.lin.service.UserService&methods=add,findUserByName,findUserById // &pid=16232&release=2.7.0&side=provider×tamp=1553884198155 URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured //當scope為none的時候不發佈 if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) //當scope不是remote的時候發佈本地服務 if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) //當scope不是local的時候發佈遠程服務 if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { //當註冊中心有多個的時候會發佈到多個註冊中心 for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); //監控url URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } //proxyFactory是一個自適應的擴展點,所以是一個proxyFactory$Adaptive //預設會有這幾個 //stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper //jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory //javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory //當url里有proxy=xxx的時候就取xxxProxyFactory,如果沒有的話預設就是JavassistProxyFactory Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //創建一個Invoker的包裝類DelegateProviderMetaDataInvoker DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //protocol是一個自動擴展點,所以會返回一個ivoker.getURL().protocol/Protocol的一個對象 如RegistryProtocol、DubboProtocol Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { //上報元數據中心 metadataReportService.publishProvider(url); } } } this.urls.add(url); }
這裡涉及到一個自適應擴展點的概念,具體什麼是自適應擴展點可以到dubbo官網上看,那裡介紹的很詳細
因為這裡要註冊的URL是registry://192.168.xxxx這樣格式的,又因為protocol又是一個Protocol自適應擴展點
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); Exporter<?> exporter = protocol.export(wrapperInvoker);
所以我們能得到protocol.export這裡裡面是調用的RegistryProtocol裡面的export方法,所以我們再來看下這個方法:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //獲得註冊地址 //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service... URL registryUrl = getRegistryUrl(originInvoker); // url to export locally //獲取提供者發佈url //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service&bean.name=... URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker //本地發佈服務 也就是服務發佈到netty容器里 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry //獲得註冊中心地址 final Registry registry = getRegistry(originInvoker); //獲得要註冊的提供者URL final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { //註冊到註冊中心 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. //訂閱url registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
我們一步一步來分析上面的代碼,首先我這邊的註冊中心用的是zookeeper所以拿到的註冊地址是zookeeper協議的,服務提供者也是用的預設的dubbo協議,那我們下一步來看下服務真正發佈的方法doLocalExport方法:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl); //這裡protocol又是一個自適應擴展點,所以裡面會調用invoker.getUrl.getProtocol+"Protocol"的export()方法 //如 DubboProtocol.export(); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
一開始就是一個雙重檢查鎖,我們不管他,直接關註我們的protocol.export方法,跟上面的一樣,protocol也是一個自適應擴展點,所以裡面實際用的是我們invoker.getUrl.getProtocol+"Protocol"的export()方法,我們invoker的url呢又是我們上麵包裝進去的providerUrl也就是dubbo://xxxx這個url,所以最終調用的就是DubboProtocol的export方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; }
上面那些代碼我們不關心先不看了,我們直接可以看到有一個openServer(url)的方法,根據名字我們就可以猜到這裡就是開啟我們服務的地方了,我們來看下:
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //創建服務 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }
private ExchangeServer createServer(URL url) { ... url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ... return server; }
我們可以看到這裡Exchangers.bind(url,requestHandler),這裡呢最終會調到:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //這裡getTransporter也是獲得一個自適應的擴展點,如果沒有配置的話預設就是用的NettyTransporter return getTransporter().bind(url, handler); }
再裡面就是發佈到Netty容器了,有興趣的可以自己去看下,現在這裡我們的服務就已經發佈了,下麵還有註冊到註冊中心,我們再看下我們上面的RegistryProtocol的export方法裡面的註冊服務的代碼:
// url to registry //獲得註冊中心地址 final Registry registry = getRegistry(originInvoker); //獲得要註冊的提供者URL final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { //註冊到註冊中心 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); }
主要還是register這個方法:
public void register(URL registryUrl, URL registeredProviderUrl) { //因為registryFacotry是一個自適應的擴展點,所以會返回一個zookeeperRegistry,如果是redis://的話就返回一個RedisRegistry Registry registry = registryFactory.getRegistry(registryUrl); //註冊到註冊中心 registry.register(registeredProviderUrl); }
根據環境我們會獲得一個ZookeeperRegistry所以我們再看下zookeeperRegistry的register方法:
因為register這個方法zookeeperRegistry並沒有去實現它,所以一定是在父類的register我們繼續看他父類FailbackRegistry的register方法:
public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side //註冊服務 是模版模式,所以會在子類實現 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } }
這方法主要註冊的就是doRegister方法了,因為是模版模式,所以這個方法zookeeperRegistry自己實現了這個方法:
@Override public void doRegister(URL url) { try { //創建一個臨時節點 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
所以這裡就是最終註冊的地方了,這裡會根據url去創建一個服務的臨時節點,到這服務的發佈和註冊就已經完成了,其他地方有興趣的可以自己去看下源碼,dubbo里很多地方都用到了自適應擴展點這個概念,所以如果要看源碼就要先去理解什麼是自適應擴展點。