一、SpringCloud 簡介 Spring Cloud 是一系列框架的有序集合如服務發現註冊、配置中心、消息匯流排、負載均衡、熔斷器、數據監控等。 SpringCloud 將多個服務框架組合起來,通過Spring Boot進行再封裝,屏蔽掉了複雜的配置和實現原理,最終給開發者提供了一套簡單易懂、易 ...
一、SpringCloud 簡介
Spring Cloud 是一系列框架的有序集合如服務發現註冊、配置中心、消息匯流排、負載均衡、熔斷器、數據監控等。
SpringCloud 將多個服務框架組合起來,通過Spring Boot進行再封裝,屏蔽掉了複雜的配置和實現原理,最終給開發者提供了一套簡單易懂、易部署和易維護的分散式系統開發工具包。
Spring Cloud是一個基於SpringBoot實現的微服務開發方案,Spring boot 是 Spring 的一套快速配置框架。可以基於spring boot 快速開發單個微服務。
二、NACOS簡介
一個更易於構建雲原生應用的動態服務發現、配置管理和服務管理平臺。
Nacos 致力於幫助您發現、配置和管理微服務。Nacos 提供了一組簡單易用的特性集,幫助您快速實現動態服務發現、服務配置、服務元數據及流量管理。
Nacos 幫助您更敏捷和容易地構建、交付和管理微服務平臺。 Nacos 是構建以“服務”為中心的現代應用架構 (例如微服務範式、雲原生範式) 的服務基礎設施。
1、Nacos中的概念
地域
物理的數據中心,資源創建成功後不能更換。
可用區
同一地域內,電力和網路互相獨立的物理區域。同一可用區內,實例的網路延遲較低。
接入點
地域的某個服務的入口功能變數名稱。
命名空間
用於進行租戶粒度的配置隔離。不同的命名空間下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用場景之一是不同環境的配置的區分隔離,例如開發測試環境和生產環境的資源(如配置、服務)隔離等。
配置
在系統開發過程中,開發者通常會將一些需要變更的參數、變數等從代碼中分離出來獨立管理,以獨立的配置文件的形式存在。目的是讓靜態的系統工件或者交付物(如 WAR,JAR 包等)更好地和實際的物理運行環境進行適配。配置管理一般包含在系統部署的過程中,由系統管理員或者運維人員完成。配置變更是調整系統運行時的行為的有效手段。
配置管理
系統配置的編輯、存儲、分發、變更管理、歷史版本管理、變更審計等所有與配置相關的活動。
配置項
一個具體的可配置的參數與其值域,通常以 param-key=param-value 的形式存在。例如我們常配置系統的日誌輸出級別(logLevel=INFO|WARN|ERROR) 就是一個配置項。
配置集
一組相關或者不相關的配置項的集合稱為配置集。在系統中,一個配置文件通常就是一個配置集,包含了系統各個方面的配置。例如,一個配置集可能包含了數據源、線程池、日誌級別等配置項。
配置集 ID
Nacos 中的某個配置集的 ID。配置集 ID 是組織劃分配置的維度之一。Data ID 通常用於組織劃分系統的配置集。一個系統或者應用可以包含多個配置集,每個配置集都可以被一個有意義的名稱標識。Data ID 通常採用類 Java 包(如 com.taobao.tc.refund.log.level)的命名規則保證全局唯一性。此命名規則非強制。
配置分組
Nacos 中的一組配置集,是組織配置的維度之一。通過一個有意義的字元串(如 Buy 或 Trade )對配置集進行分組,從而區分 Data ID 相同的配置集。當您在 Nacos 上創建一個配置時,如果未填寫配置分組的名稱,則配置分組的名稱預設採用 DEFAULT_GROUP 。配置分組的常見場景:不同的應用或組件使用了相同的配置類型,如 database_url 配置和 MQ_topic 配置。
配置快照
Nacos 的客戶端 SDK 會在本地生成配置的快照。當客戶端無法連接到 Nacos Server 時,可以使用配置快照顯示系統的整體容災能力。配置快照類似於 Git 中的本地 commit,也類似於緩存,會在適當的時機更新,但是並沒有緩存過期(expiration)的概念。
服務
通過預定義介面網路訪問的提供給客戶端的軟體功能。
服務名
服務提供的標識,通過該標識可以唯一確定其指代的服務。
服務註冊中心
存儲服務實例和服務負載均衡策略的資料庫。
服務發現
在電腦網路上,(通常使用服務名)對服務下的實例的地址和元數據進行探測,並以預先定義的介面提供給客戶端進行查詢。
元信息
Nacos數據(如配置和服務)描述信息,如服務版本、權重、容災策略、負載均衡策略、鑒權配置、各種自定義標簽 (label),從作用範圍來看,分為服務級別的元信息、集群的元信息及實例的元信息。
應用
用於標識服務提供方的服務的屬性。
服務分組
不同的服務可以歸類到同一分組。
虛擬集群
同一個服務下的所有服務實例組成一個預設集群, 集群可以被進一步按需求劃分,劃分的單位可以是虛擬集群。
實例
提供一個或多個服務的具有可訪問網路地址(IP:Port)的進程。
權重
實例級別的配置。權重為浮點數。權重越大,分配給該實例的流量越大。
健康檢查
以指定方式檢查服務下掛載的實例 (Instance) 的健康度,從而確認該實例 (Instance) 是否能提供服務。根據檢查結果,實例 (Instance) 會被判斷為健康或不健康。對服務發起解析請求時,不健康的實例 (Instance) 不會返回給客戶端。
健康保護閾值
為了防止因過多實例 (Instance) 不健康導致流量全部流向健康實例 (Instance) ,繼而造成流量壓力把健康實例 (Instance) 壓垮並形成雪崩效應,應將健康保護閾值定義為一個 0 到 1 之間的浮點數。當功能變數名稱健康實例數 (Instance) 占總服務實例數 (Instance) 的比例小於該值時,無論實例 (Instance) 是否健康,都會將這個實例 (Instance) 返回給客戶端。這樣做雖然損失了一部分流量,但是保證了集群中剩餘健康實例 (Instance) 能正常工作。
2、Nacos 架構
基礎架構如下:
邏輯架構及組件如下:
- 服務管理:實現服務CRUD,功能變數名稱CRUD,服務健康狀態檢查,服務權重管理等功能
- 配置管理:實現配置管CRUD,版本管理,灰度管理,監聽管理,推送軌跡,聚合數據等功能
- 元數據管理:提供元數據CURD 和打標能力
- 插件機制:實現三個模塊可分可合能力,實現擴展點SPI機制
- 事件機制:實現非同步化事件通知,sdk數據變化非同步通知等邏輯
- 日誌模塊:管理日誌分類,日誌級別,日誌可移植性(尤其避免衝突),日誌格式,異常碼+幫助文檔
- 回調機制:sdk通知數據,通過統一的模式回調用戶處理。介面和數據結構需要具備可擴展性
- 定址模式:解決ip,功能變數名稱,nameserver、廣播等多種定址模式,需要可擴展
- 推送通道:解決server與存儲、server間、server與sdk間推送性能問題
- 容量管理:管理每個租戶,分組下的容量,防止存儲被寫爆,影響服務可用性
- 流量管理:按照租戶,分組等多個維度對請求頻率,長鏈接個數,報文大小,請求流控進行控制
- 緩存機制:容災目錄,本地緩存,server緩存機制。容災目錄使用需要工具
- 啟動模式:按照單機模式,配置模式,服務模式,dns模式,或者all模式,啟動不同的程式+UI
- 一致性協議:解決不同數據,不同一致性要求情況下,不同一致性機制
- 存儲模塊:解決數據持久化、非持久化存儲,解決數據分片問題
- Nameserver:解決namespace到clusterid的路由問題,解決用戶環境與nacos物理環境映射問題
- CMDB:解決元數據存儲,與三方cmdb系統對接問題,解決應用,人,資源關係
- Metrics:暴露標準metrics數據,方便與三方監控系統打通
- Trace:暴露標準trace,方便與SLA系統打通,日誌白平化,推送軌跡等能力,並且可以和計量計費系統打通
- 接入管理:相當於阿裡雲開通服務,分配身份、容量、許可權過程
- 用戶管理:解決用戶管理,登錄,sso等問題
- 許可權管理:解決身份識別,訪問控制,角色管理等問題
- 審計系統:擴展介面方便與不同公司審計系統打通
- 通知系統:核心數據變更,或者操作,方便通過SMS系統打通,通知到對應人數據變更
- OpenAPI:暴露標準Rest風格HTTP介面,簡單易用,方便多語言集成
- Console:易用控制台,做服務管理、配置管理等操作
- SDK:多語言sdk
- Agent:dns-f類似模式,或者與mesh等方案集成
- CLI:命令行對產品進行輕量化管理,像git一樣好用
部署架構如下:
nacos 官網以及幫助文檔和部署手冊:https://nacos.io/zh-cn/index.html
nacos github: https://github.com/alibaba/nacos
三、NACOS源碼分析
1、Nacos註冊源碼分析-Clinet端
cosumer啟動的時候,從nacos server上讀取指定服務名稱的實例列表,緩存到本地記憶體中。
開啟一個定時任務,每隔10s去nacos server上拉取服務列表
nacos的push機制:
通過心跳檢測發現服務提供者出現心態超時的時候,推送一個push消息到consumer,更新本地的緩存數據。
客戶端Client
我們自己的項目在配置了nacos作為註冊中心後,至少要配置這麼一個屬性
spring.cloud.nacos.discovery.server-addr=ip地址:8848 # 從邏輯上看,這個是通過grpc去註冊還是通過http去註冊。false-http1.x註冊 true-gRPC註冊,預設是true,也就是通過gRPC去註冊,畢竟gRPC的性能上要比http1.x高很多 spring.cloud.nacos.discovery.ephemeral=false
這個屬性會讓應用找到nacos的server地址去註冊。如果不配置的話,會一直報錯
springboot的@EnableAutoConfiguration這裡就不再講解了。都到nacos的源碼了,springboot預設是熟悉的。
我們再去打開NacosServiceRegistryAutoConfiguration這個類。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
其中第三個類NacosAutoServiceRegistration
實現了一個抽象類AbstractAutoServiceRegistration
.
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> { @Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } }
這裡有實現一個ApplicationListener<WebServerInitializedEvent>的類,這個類是spring的一個監聽事件(觀察者模式),而這個事件就是webserver初始化的時候去觸發的。onApplicationEvent方法調用了bind()方法。而bind()中又調用了start().
start()中有一個register()。而這個register就是NacosServiceRegistry中的register()。
public class NacosServiceRegistry implements ServiceRegistry<Registration> { @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e); } } } }
- getNacosInstanceFromRegistration 獲取註冊的實例信息。
private Instance getNacosInstanceFromRegistration(Registration registration) { Instance instance = new Instance(); instance.setIp(registration.getHost()); instance.setPort(registration.getPort()); instance.setWeight(nacosDiscoveryProperties.getWeight()); instance.setClusterName(nacosDiscoveryProperties.getClusterName()); instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled()); instance.setMetadata(registration.getMetadata()); instance.setEphemeral(nacosDiscoveryProperties.isEphemeral()); return instance; }
- namingService.registerInstance(serviceId, group, instance);
clientProxy有3個實現類,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。
這個類的構造方法中有個init(properties)方法,這個方法中給clientProxy賦值了。走的是NamingClientProxyDelegate方法。一般情況下,帶有delegate的方法都是委派模式。
public NacosNamingService(String serverList) throws NacosException { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList); init(properties); } public NacosNamingService(Properties properties) throws NacosException { init(properties); } private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); InitUtils.initWebRootContext(properties); initLogName(properties); this.changeNotifier = new InstancesChangeNotifier(); NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); NotifyCenter.registerSubscriber(changeNotifier); this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties); this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier); } @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); clientProxy.registerService(serviceName, groupName, instance); }
基於http1.x協議註冊
-
NamingClientProxyDelegate.registerService
委派這裡做了一個可執行的判斷
@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); }
NamingClientProxyDelegate.getExecuteClientProxy
做了一個判斷,配置ephemeral=false就走http,否則grpc。這裡請註意,如果nacos-server還是用的1.x.x版本的話,會報錯的。因為2.x.x增加一個grpc的支持,會額外的多增加一個埠,預設對外提供埠為8848和9848
private NamingClientProxy getExecuteClientProxy(Instance instance) { return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; }
-
NamingHttpClientProxy.registerService
這裡的clientProxy=NamingHttpClientProxy
@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } final Map<String, String> params = new HashMap<String, String>(32); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, groupedServiceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put(IP_PARAM, instance.getIp()); params.put(PORT_PARAM, String.valueOf(instance.getPort())); params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight())); params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled())); params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy())); params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral())); params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
NamingHttpClientProxy.reqApi
public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, serverListManager.getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) { throw new NacosException(NacosException.INVALID_PARAM, "no server available"); } NacosException exception = new NacosException(); if (serverListManager.isDomain()) { String nacosDomain = serverListManager.getNacosDomain(); for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } } index = (index + 1) % servers.size(); } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }
serverListManager.isDomain()這個判斷是配置了幾個nacos server的值,如果是一個的話,走if邏輯,如果多餘1個的話,走else邏輯。
else中的servers就是nacos server服務列表,通過Ramdom拿到一個隨機數,然後去callServer(),如果發現其中的一個失敗,那麼index+1 獲取下一個服務節點再去callServer。如果所有的都失敗的話,則拋出錯誤。
NamingHttpClientProxy.callServer
前邊的判斷支線省略,拼接url,拼好了後,進入try邏輯塊中,這裡封裝了一個nacosRestTemplate類。請求完成後,返回一個restResult,拿到了請求結果後,把請求結果code放入了一個交MetricsMonitor的類中了,從代碼上看很明顯是監控相關的類,點擊進去果然發現是prometheus相關的。這裡我們不擴展了,繼續回到主線。
如果返回結果是200的話,把result.content返回去。
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0; String namespace = params.get(CommonParams.NAMESPACE_ID); String group = params.get(CommonParams.GROUP_NAME); String serviceName = params.get(CommonParams.SERVICE_NAME); params.putAll(getSecurityHeaders(namespace, group, serviceName)); Header header = NamingHttpUtil.builderHeader(); String url; if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) { url = curServer + api; } else { if (!InternetAddressUtil.containsPort(curServer)) { curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())) .observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException(restResult.getCode(), restResult.getMessage()); } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to request", e); throw new NacosException(NacosException.SERVER_ERROR, e); } }
-
NacosRestTemplate.exchangeForm
關鍵方法:this.requestClient().execute()
-
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception { RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues); return execute(url, httpMethod, requestHttpEntity, responseType); } private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception { URI uri = HttpUtils.buildUri(url, requestEntity.getQuery()); if (logger.isDebugEnabled()) { logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody()); } ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType); HttpClientResponse response = null; try { response = this.requestClient().execute(uri, httpMethod, requestEntity); return responseHandler.handle(response); } finally { if (response != null) { response.close(); } } }
private final HttpClientRequest requestClient; private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>(); public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) { super(logger); this.requestClient = requestClient; } private HttpClientRequest requestClient() { if (CollectionUtils.isNotEmpty(interceptors)) { if (logger.isDebugEnabled()) { logger.debug("Execute via interceptors :{}", interceptors); } return new InterceptingHttpClientRequest(requestClient, interceptors.iterator()); } return requestClient; }
HttpClientBeanHolder.getNacosRestTemplate
典型的雙重檢查鎖。
-
public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) { if (httpClientFactory == null) { throw new NullPointerException("httpClientFactory is null"); } String factoryName = httpClientFactory.getClass().getName(); NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName); if (nacosRestTemplate == null) { synchronized (SINGLETON_REST) { nacosRestTemplate = SINGLETON_REST.get(factoryName); if (nacosRestTemplate != null) { return nacosRestTemplate; } nacosRestTemplate = httpClientFactory.createNacosRestTemplate(); SINGLETON_REST.put(factoryName, nacosRestTemplate); } } return nacosRestTemplate; }
而NamingHttpClientFactory是一個AbstractHttpClientFactory的實現類,由於NamingHttpClientProxy沒有重寫createNacosRestTemplate方法。所以最終引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。
private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory(); public NacosRestTemplate getNacosRestTemplate() { return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY); } private static class NamingHttpClientFactory extends AbstractHttpClientFactory { @Override protected HttpClientConfig buildHttpClientConfig() { return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS) .setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build(); } @Override protected Logger assignLogger() { return NAMING_LOGGER; } }
AbstractHttpClientFactory.createNacosRestTemplate
@Override public NacosRestTemplate createNacosRestTemplate() { HttpClientConfig httpClientConfig = buildHttpClientConfig(); final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig); // enable ssl initTls(new BiConsumer<SSLContext, HostnameVerifier>() { @Override public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) { clientRequest.setSSLContext(loadSSLContext()); clientRequest.replaceSSLHostnameVerifier(hostnameVerifier); } }, new TlsFileWatcher.FileChangeListener() { @Override public void onChanged(String filePath) { clientRequest.setSSLContext(loadSSLContext()); } }); return new NacosRestTemplate(assignLogger(), clientRequest); }
JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);
可以看到這裡定義了一個JdkHttpClientRequest 。
再往下跟就到java.net.HttpURLConnection的調用,去請求nacos-server的地址,再往下的就不做分析了,進入了http的通訊層了。
最終返回了一個結果,如果是200的話,就註冊成功了。失敗了就會拋出異常。
基於gRPC http2.0的註冊
這裡同樣的從gRPC和http的委派來進行分析
NamingClientProxyDelegate.registerService
代碼上邊已經分析過,我們直接進入gRPC的實現。
@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); }
-
NamingGrpcClientProxy.registerService
redoService.cacheInstanceForRedo 這個從名稱上看應該是重試機制,
-
@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance); redoService.cacheInstanceForRedo(serviceName, groupName, instance); doRegisterService(serviceName, groupName, instance); }
-
NamingGrpcRedoService.cacheInstanceForRedo
這裡看起來只是給ConcurrentMap中存放一個redoData,並沒有其他的邏輯,後續可能會用到這個。回到主線,繼續往下走。
-
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>(); public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) { String key = NamingUtils.getGroupedName(serviceName, groupName); InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance); synchronized (registeredInstances) { registeredInstances.put(key, redoData); } }
-
NamingGrpcClientProxy.doRegisterService
request是根據構造函數封裝的一個實例,requestToServer去進行註冊。
-
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException { InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance); requestToServer(request, Response.class); redoService.instanceRegistered(serviceName, groupName); }
NamingGrpcClientProxy.requestToServer
request.putAllHeader推測是跟許可權校驗相關的,我搭建的沒有設置鑒權,所以都是空的。
然後根據rpcClient去調用request方法。根據超時時間判斷的,這2個分支最終都會進入一個方法,預設是3s的超時時間。
最終返回一個response結果。
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException { try { request.putAllHeader( getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName())); Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout); if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) { throw new NacosException(response.getErrorCode(), response.getMessage()); } if (responseClass.isAssignableFrom(response.getClass())) { return (T) response; } NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e); } throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response"); }
- RpcClient.request
這裡的校驗暫且不看,直切主線, response = this.currentCo
-
-
-