在之前的 "EurekaClient自動裝配及啟動流程解析" 一文中我們提到過,在構造 類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程 客戶端發起註冊 這個方法中包含的 和`instanceInfo`兩個對象在之前的文章中都已經單獨拿出來了: "Eureka中重要的對象" 服務端接受註冊 ...
在之前的EurekaClient自動裝配及啟動流程解析一文中我們提到過,在構造DiscoveryClient
類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程
客戶端發起註冊
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
這個方法中包含的registrationClient
和instanceInfo
兩個對象在之前的文章中都已經單獨拿出來了:Eureka中重要的對象
服務端接受註冊
服務端接受註冊的Controller在ApplicationResource
類中,這裡需要註意的是普通客戶端註冊時其中參數isReplication
為false,這個參數就是控制集群是否同步的表示
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// 一系列的參數校驗
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// AWS的一些東西,不用細看
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//註冊
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
接著往下看註冊的處理邏輯
public void register(final InstanceInfo info, final boolean isReplication) {
// 租約過期時間
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 註冊
super.register(info, leaseDuration, isReplication);
//集群複製
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
Lease
租約這個類之前也分析過了,不再展開了
服務端保存註冊信息
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
//獲取鎖
read.lock();
//獲取該實例的註冊信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
//如果不存在則添加
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 當存在這個應用的註冊信息時
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
//使用註冊時間長的一方的應用信息
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
//創建租約
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
//如果存在租約則更新開始時間
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
//添加到最近註冊隊列
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// 獲得應用實例最終狀態
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
- 實例信息是由一個map對象保存的,在這個map中,key是應用的appName,value是另外一個map,在這個map中key是應用的id,而value則是應用的租約信息,map對象如下:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
- 大體流程為,先查看是否存在該appName的註冊信息,如不存在則創建。
- 接著查看是否存在該id的註冊信息,如果存在判斷客戶端的最後修改時間,記錄最後修改的實例,如果不存在則設置自我保護模式的幾個參數
- 根據實例創建對應的租約信息,然後添加到map對象中
- 添加到最近註冊隊列
- 添加到應用實例覆蓋狀態映射
- 設置應用實例最終狀態,添加最近租約變更隊列,設置緩存等
集群數據同步
當isReplication
屬性為true的時候,就會牽扯到集群信息同步了
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info ,
InstanceStatus newStatus , boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
關於PeerEurekaNode
在之前的文章也提到了,保存了集群節點信息,這裡可以看到是迴圈所有的集群節點,然後排除本身的信息之後調用了replicateInstanceActionsToPeers
方法
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
這裡重點關註Register分支
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
在不關心Eureka自有的調度處理相關的前提下,核心代碼是replicationClient.register(info)
,也就是說,如果當客戶端註冊時指定需要同步集群信息時,Eureka會把這個客戶端再註冊到它所在的集群其它的節點上
服務端發起集群數據同步
在之前得EurekaServer自動裝配及啟動流程解析一文中,我們提到過在初始化服務端的時候會從EurekaServer集群中同步數據,也就是下麵這段代碼:
public class EurekaServerBootstrap {
protected void initEurekaServerContext() throws Exception {
//xxxx
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// 從其他 Eureka-Server 拉取註冊信息
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
數據同步的代碼在syncUp
中
public int syncUp() {
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
//未讀取到註冊信息則開始等待
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 獲取註冊信息
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
//判斷是否可以註冊,這裡基於AWS環境的判斷,如果不是AWS環境直接返回true,故不需要關心這個問題
if (isRegisterable(instance)) {
//註冊
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
這裡的處理流程就是當前EurekaServer獲取到客戶端的註冊信息之後,就會再次調用上邊咱們提到的服務端Controller調用的register
方法,當然這次調用時isReplication
參數就變為true了
本文由博客一文多發平臺 OpenWrite 發佈!