在上文中介紹了基礎類AbstractRegistry類的解釋,在本篇中將繼續介紹該包下的其他類。 FailbackRegistry 該類繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些記憶體緩存的變化,而真正的註冊訂閱的實現邏輯在Failbac ...
在上文中介紹了基礎類AbstractRegistry類的解釋,在本篇中將繼續介紹該包下的其他類。
FailbackRegistry
該類繼承了AbstractRegistry,AbstractRegistry中的註冊訂閱等方法,實際上就是一些記憶體緩存的變化,而真正的註冊訂閱的實現邏輯在FailbackRegistry實現,並且FailbackRegistry提供了失敗重試的機制。
初始化
// Scheduled executor service
// 定時任務執行器
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 失敗重試定時器,定時去檢查是否有請求失敗的,如有,無限次重試。
private final ScheduledFuture<?> retryFuture;
// 註冊失敗的URL集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// 取消註冊失敗的URL集合
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// 訂閱失敗的監聽器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 取消訂閱失敗的監聽器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 通知失敗的URL集合
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/**
* The time in milliseconds the retryExecutor will wait
*/
// 重試頻率
private final int retryPeriod;
構造函數
public FailbackRegistry(URL url) {
super(url);
// 從url中讀取重試頻率,如果為空,則預設5000ms
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 創建失敗重試定時器
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
//重試
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
構造函數主要是創建了失敗重試的定時器,重試頻率從URL取,如果沒有設置,則預設為5000ms。
在該類中對註冊、取消註冊、訂閱、取消訂閱進行了重寫操作,代碼邏輯相對簡單。
@Override
public void register(URL url) {
super.register(url);
//首先從失敗的緩存中刪除該url
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 向註冊中心發送一個註冊請求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 如果開啟了啟動時檢測,則直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 把這個註冊失敗的url放入緩存,並且定時重試。
failedRegistered.add(url);
}
}
在註冊中它會失敗的註冊緩存和失敗的未註冊緩存集合中移除該URL,然後向註冊中心執行註冊。
AbstractRegistryFactory
該類實現了RegistryFactory介面,抽象了createRegistry方法,它實現了Registry的容器。
初始化
private static final ReentrantLock LOCK = new ReentrantLock();
// Registry Collection Map<RegistryAddress, Registry>
// Registry 集合
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
銷毀所有的Registry對象,並清理緩存數據
public static Collection<Registry> getRegistries() {
return Collections.unmodifiableCollection(REGISTRIES.values());
}
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
// 獲得鎖
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
// 銷毀
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 清空緩存
REGISTRIES.clear();
} finally {
// Release the lock
// 釋放鎖
LOCK.unlock();
}
}
該方法是實現了Registry介面的方法,這裡最要註意的是createRegistry,因為AbstractRegistryFfactory本身就是抽象類,而createRegistry也是抽象方法,為了讓子類只要關註該方法,比如說redis實現的註冊中心和zookeeper實現的註冊中心創建方式肯定不同,而他們相同的一些操作都已經在AbstractRegistryFactory中實現,所以只要關註且實現該抽象方法即可。
@Override
public Registry getRegistry(URL url) {
// 修改url
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
// 計算key值
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
// 獲得鎖
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 創建Registry對象
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 添加到緩存。
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
// 釋放鎖
LOCK.unlock();
}
}