dubbo集群概述 dubbo集群功能的切入點在ReferenceConfig.createProxy方法以及Protocol.refer方法中。 在ReferenceConfig.createProxy方法中,如果用戶指定多個提供者url或註冊中心url,那麼會創建多個Invoker,然後用Sta ...
dubbo集群概述
dubbo集群功能的切入點在ReferenceConfig.createProxy方法以及Protocol.refer方法中。
在ReferenceConfig.createProxy方法中,如果用戶指定多個提供者url或註冊中心url,那麼會創建多個Invoker,然後用StaticDirectory將這多個Invoker封裝在一起,然後用相應的Cluster實現類將這個靜態的服務目錄包裝成一個Invoker,每種集群類都對應一種Invoker的集群包裝類,例如,FailoverClusterInvoker,FailbackClusterInvoker,FailfastClusterInvoker,FailsafeClusterInvoker,ForkingClusterInvoker等等,而這些封裝集群邏輯的Invoker包裝類都繼承自AbstractClusterInvoker抽象類。這個抽象類里主要實現了調用時的狀態檢查,Invocation類參數設置,負載均衡,服務提供者可用性檢測等邏輯,而服務調用失敗後的行為邏輯則交由子類實現。
AbstractClusterInvoker.invoke
首先我們從這個方法看起,這個方法是Invoker類的調用入口,
@Override
// 這個方法的主要作用是為調用做一些前置工作,
// 包括檢查狀態,設置參數,從服務目錄取出invoker列表,根據<方法名>.loadbalance參數值獲取相應的負載均衡器
// 最後調用模板方法
public Result invoke(final Invocation invocation) throws RpcException {
// 檢查該Invoker是否已經被銷毀
// 在監聽到註冊中心變更刷新Invoker列表時可能會銷毀不再可用的Invoker
checkWhetherDestroyed();
// binding attachments into invocation.
// 將RpcContext中的參數綁定到invocation上
// 用戶可以通過RpcContext向每次調用傳遞不同的參數
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 列出所有的服務提供者
// 這個方法直接調用服務目錄的list方法
List<Invoker<T>> invokers = list(invocation);
// 根據url中的loadbalance參數值獲取相應的負載均衡器,預設是隨機負載均衡RandomLoadBalance
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
// 添加調用id,唯一標識本次調用
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 模板方法,子類實現
return doInvoke(invocation, invokers, loadbalance);
}
FailoverClusterInvoker.doInvoke
我們以預設的集群類FailoverClusterInvoker為例,分析一下這個類的doInvoke方法
// 這個方法主要實現了重試的邏輯,這也正是這個類的特性,故障轉移功能
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 拷貝一份本地引用,invokers可能會變
List<Invoker<T>> copyInvokers = invokers;
// 檢查提供者列表是否為空
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 獲取調用的方法的retries參數值,重試次數等於該值+1,因為第一次調用不算重試
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
// 迴圈重試
// 記錄最後一次出現的異常
RpcException le = null; // last exception.
// 記錄調用失敗的提供者
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
// 記錄調用過的提供者的地址,
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
// 每次迴圈都要重新檢查狀態,重新列出可用的提供者Invoker,並檢查可用的Invoker是否為空
// 因為這些狀態或提供者信息隨時都可能發生變化
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 從可用的Invoker列表總選擇一個
// 選擇邏輯中考慮了“粘滯”調用和負載均衡的邏輯
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 添加到已經調用的列表中
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 對於業務異常直接拋出,這個異常會穿透dubbo框架直接拋給用戶
// 非業務異常例如網路問題,連接斷開,提供者下線等可以通過故障轉移,重試機制解決,
// 這裡之所以直接拋出是因為一旦發生了業務異常就不是dubbo框架能處理的了,再重試也沒有意義了
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
這個方法的邏輯還是比較清晰的,就是重試,這也就是這個這個類的主要功能,故障轉移,如果調用發生異常,就重試調用其他可用的提供者。其中select方法的實現在抽象類AbstractClusterInvoker中。
AbstractClusterInvoker.select
// 這個方法主要實現了“粘滯”調用的邏輯
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();
// 可以通過在url中設置sticky參數的值來決定要不要啟用“粘滯”調用的特性
// 預設不啟用該特性
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
//ignore overloaded method
// 如果緩存的粘滯Invoker已經不在可用列表裡了,那麼就應當將其移除
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore concurrency problem
// 如果啟用了粘滯調用,並且粘滯調用存在,並且粘滯的Invoker不在已經調用失敗的Invoker列表中
// 那麼直接返回粘滯的Invoker
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
// 根據負載均衡策略選擇一個Invoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
// 設置粘滯的Invoker
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
這個方法主要實現了“粘滯”調用的邏輯。
AbstractClusterInvoker.doSelect
// 根據負載均衡策略選擇一個Invoker
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
// 根據負載均衡策略選擇一個Invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
// 對於選擇出來的Invoker還要再判斷其可用性
// 對於如下情況需要再次選擇Invoker
// 1. 選出的Invoker在調用失敗列表中
// 2. 設置了可用檢查為true並且選出的Invoker不可用
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
// 重新選擇Invoker, 首先排除調用失敗列表進行選擇,實在不行會去調用失敗列表中看能不能找到又“活過來”的提供者
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
//Avoid collision
// 如果沒有重選出新的Invoker,那麼直接用下一個Invoker
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
第一次選擇是不考慮調用失敗列表的,所以選出來的Invoker有可能在調用失敗列表中,這時需要進行重選。
AbstractClusterInvoker.reselect
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
//Allocating one in advance, this list is certain to be used.
List<Invoker<T>> reselectInvokers = new ArrayList<>(
invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
// First, try picking a invoker not in `selected`.
for (Invoker<T> invoker : invokers) {
if (availablecheck && !invoker.isAvailable()) {
continue;
}
// 排除調用失敗列表中的Invoker
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
// 如果還有剩餘的Invoker, 那麼根據負載均衡邏策略選擇一個
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
// Just pick an available invoker using loadbalance policy
// 是在沒有可用的,只能從調用失敗列表中找找看有沒有可用的
// 因為在重試期間有可能之前調用失敗的提供者變成可用的了
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
// 再次選擇
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
// 實在沒有可用的提供者,只能返回null了
return null;
}
其實從這幾個選擇的方法中可以看出來,dubbo的作者還是很用心的,盡最大可能保證調用的成功。
FailfastClusterInvoker
快速失敗,只調用一次,失敗後直接拋異常。代碼很簡單,就不多說了
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
FailsafeClusterInvoker
失敗安全的故障處理策略,所謂失敗安全是指在調用失敗後,不拋異常只記錄日誌。
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// 返回一個空結果,用戶需要對返回結果進行判斷
return new RpcResult(); // ignore
}
}
FailbackClusterInvoker
失敗後記錄下失敗的調用,之後以一定的間隔時間進行重試,這種策略很適合通知類的服務調用。重試間隔固定為5秒, 重試次數可以通過參數設置,預設是3次。
ForkingClusterInvoker
這種策略比較有意思,每次調用都會起多個線程並行第跑,誰先跑出結果就用誰的,這種估計很少用吧,誰這麼財大氣粗,大把大把的資源用來浪費。
不過這很像一些分散式計算框架中的推測執行策略,如果有些任務跑的慢,那麼就會在其他節點也跑這個任務,誰先跑完就用誰的結果,比如spark中就有推測執行的機制。
總結
不同的集群包裝類有不同的故障處理策略,預設的故障轉移,此外常用的有快速失敗,失敗安全,定時重試,合併調用等等。