作者:京東物流 王北永 姚再毅 李振 1 背景 目前,ducc實現了實時近乎所有配置動態生效的場景,但是配置是否實時生效,不能直觀展示每個機器上jvm內對象對應的參數是否已變更為準確的值,大部分時候需要查看日誌確認是否生效。 2 技術依賴 1)Jsf:京東RPC框架,用作機器之間的通訊工具 2)re ...
作者:京東物流 王北永 姚再毅 李振
1 背景
目前,ducc實現了實時近乎所有配置動態生效的場景,但是配置是否實時生效,不能直觀展示每個機器上jvm內對象對應的參數是否已變更為準確的值,大部分時候需要查看日誌確認是否生效。
2 技術依賴
1)Jsf:京東RPC框架,用作機器之間的通訊工具
2)redis/redisson:redis,用作配置信息的存儲
3)ZK/Curator: Zookeeper,用作配置信息的存儲 和redis二選一
3)clover:定時任務集群,用作任務延遲或周期性執行
3 實現原理
1)接入方:
各個接入系統通過接入管理模塊獲取token,並指定所在系統發佈的的伺服器ip,用作後續的ip鑒權。當系統啟動時,自動在各個系統生成介面提供方,並註冊到JSF註冊中心。別名需各個系統唯一不重覆。鑒權為統一服務端做IP鑒權。
2)統一配置服務端:
提供按不同接入方、不同系統、不同環境的配置界面。業務人員可設定自動生效時間或者立即生效時間。如果是立刻生效,則通過JSF廣播或者指定機器生效配置。如果是定時生效,則新增定時器並指定生效規則,達到時間後觸發廣播通知。
整個接入方和統一配置服務端的架構如下圖
4 實現步驟
1)重寫JSF類ConsumerConfig類方法refer,將其中的輪訓調用客戶端改為廣播調用客戶端BroadCastClient。
this.client= new BroadCastClient(this);
this.proxyInvoker = new ClientProxyInvoker(this);
ProtocolFactory.check(Constants.ProtocolType.valueOf(this.getProtocol()), Constants.CodecType.valueOf(this.getSerialization()));
this.proxyIns = (T) ProxyFactory.buildProxy(this.getProxy(), this.getProxyClass(), this.proxyInvoker);
2)廣播調用客戶端方法分別獲取當前註冊中心有心跳的服務提供者和已失去連接的機器列表。對統一配置來講,要麼同時失敗,要麼同時成功,判斷如果存在不正常的服務提供方,則不同步。只有全部提供方存在才可以開始廣播配置信息
ConcurrentHashMap<Provider, ClientTransport> concurrentHashMap = this.connectionHolder.getAliveConnections();
ConcurrentHashMap<Provider, ClientTransportConfig> deadConcurrentHashMap = this.connectionHolder.getDeadConnections();
if(deadConcurrentHashMap!=null && deadConcurrentHashMap.size()>0){
log.warn("當前別名{}存在不正常服務提供方數量{},請關註!",msg.getAlias(),deadConcurrentHashMap.size());
throw new RpcException(String.format("當前別名%s存在不正常服務提供方數量%s,請關註!",msg.getAlias(),deadConcurrentHashMap.size()));
}
if(concurrentHashMap.isEmpty()){
log.info("當前別名{}不存在正常服務提供方",msg.getAlias());
throw new RpcException(String.format("當前別名%s不存在正常服務提供方",msg.getAlias()));
}
Iterator aliveConnections = concurrentHashMap.entrySet().iterator();
log.info("當前別名{}存在正常服務提供方數量{}",msg.getAlias(),concurrentHashMap.size());
while (aliveConnections.hasNext()) {
Entry<Provider, ClientTransport> entry = (Entry) aliveConnections.next();
Provider provider = (Provider) entry.getKey();
log.info("當前連接ip={}、port={}、datacenterCode={}",provider.getIp(),provider.getPort(),provider.getDatacenterCode());
ClientTransport connection = (ClientTransport) entry.getValue();
if (connection != null && connection.isOpen()) {
try {
result = super.sendMsg0(new Connection(provider, connection), msg);
} catch (RpcException rpc) {
exception = rpc;
log.warn(rpc.getMessage(), rpc);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
log.warn(e.getMessage(), e);
}
}
}
3)服務配置端,當業務人員配置及時生效或者任務達到時,則根據配置,生成服務調用方,通過統一刷新介面將配置同步刷新到對應的接入系統中,如下圖為操作界面,當增刪改查時,會將屬性增量同步。
服務端在上面操作增刪改時,通過以下方式獲取服務調用方
public static ExcuteAction createJsfConsumer(String alias, String token) {
RegistryConfig jsfRegistry = new RegistryConfig();
jsfRegistry.setIndex("i.jsf.jd.com");
BroadCastConsumerConfig consumerConfig = new BroadCastConsumerConfig<>();
Map<String, String> parameters = new HashMap<>();
parameters.put(".token",token);
consumerConfig.setParameters(parameters);
consumerConfig.setInterfaceId(RefreshRemoteService.class.getName());
consumerConfig.setRegistry(jsfRegistry);
consumerConfig.setProtocol("jsf");
consumerConfig.setAlias(alias);
consumerConfig.setRetries(2);
return new ExcuteAction(consumerConfig);
}
通過以上的配置的客戶端,調用服務提供方方法refreshRemoteService#refresh,將配置信息進行同步到各個接入系統
public void call(Map<String,Object> propertiesValue){
try{
RefreshRemoteService refreshRemoteService = (RefreshRemoteService)consumerConfig.refer();
if(refreshRemoteService!=null){
refreshRemoteService.refresh(propertiesValue);
}
}catch (Exception e){
log.error(e.getMessage());
throw new EasyConfigException(e);
}finally {
consumerConfig.unrefer(); ;
}
}
4)接入方啟動時,需要根據自己配置,將存在redis或者zk的配置一次載入到實例變數中。並註冊刷新介面到JSF註冊中心。
其中註冊刷新介面到JSF註冊中心代碼如下
@Bean(name = "refreshPorpertiesService")
public ProviderConfig createJsfProvider() throws Exception {
ProviderConfig providerConfig = new ProviderConfig();
providerConfig.setId("refreshPorpertiesService");
providerConfig.setInterfaceId(RefreshRemoteService.class.getName());
providerConfig.setRef(new RefreshRemoteServiceDelage(applicationContext));
providerConfig.setTimeout(30000);
providerConfig.setAlias(EasyConfigure.getAppCode()+EasyConfigure.getEnv());
providerConfig.setServer(serverConfig);
providerConfig.setRegistry(jsfRegistry);
providerConfig.setParameter("token", MD5Util.md5(EasyConfigure.getAppCode()));
providerConfig.export();
return providerConfig;
}
其中
RefreshRemoteServiceDelage類提供刷新介面的實際邏輯如下,需判斷當前實例是jdk動態代理還是cglib代理
判斷邏輯如下
if(AopUtils.isJdkDynamicProxy(object)) {
object= AopUtil.getJdkDynamicProxyTargetObject(object);
} else if(AopUtils.isCglibProxy(object)){ //cglib
object= AopUtil.getCglibProxyTargetObject(object);
}
實例對象變數值根據自定義的參數轉換方式轉換後賦值實例變數
if(autoValue.convert()!=null && !autoValue.getClass().isInterface()){
if(!autoValue.convert().newInstance().getInClassType().isAssignableFrom(newVal.getClass()) ){
continue;
}
newVal = autoValue.convert().newInstance().convert(newVal);
if(newVal!=null){
if(!autoValue.convert().newInstance().getOutClassType().isAssignableFrom(newVal.getClass()) ){
continue;
}
field.setAccessible(true);
Object value = ReflectionUtils.getField(field,object);
log.info("change properties{} for object {} before value {}",field.getName(),object.getClass().getName(),value);
ReflectionUtils.setField(field,object,newVal);
log.info("change properties{} for object {} after value {}",field.getName(),object.getClass().getName(),newVal);
}
}
5 實踐
1)pom引入
<dependency>
<groupId>com.jdl</groupId>
<artifactId>easyconfig</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2)配置存儲配置(比如redis方式)
refresh:
config:
appCode: zdzq-worker-appcode
redisUrl: redis://:@127.0.0.1
3)類全局變數需要實時刷新配置,需在類統一指定註解PropertiryChangeListener,實例變數需要增加註解AutoValue並指定數據格式轉換器
@PropertiryChangeListener
public class ChanceServiceImpl implement ChanceService{
@AutoValue(convert = DateConvert.class,alias = "config-id")
private Date signDate;
@AutoValue(convert = SpmKaApply Convert.class,alias = "config-id")
private SpmKaApply spmKaApply;
}
以上convert方法自定義,支持各種複雜配置對象,舉例數據轉換為List如下
public class Convert2 implements Convert<Map<String, String>, Set<String>> {
public Convert2(){}
@Override
public Set<String> convert(Map<String, String> siteInfoMap) {
....你的對象值轉換
}
@Override
public Class<?> getInClassType() {
return Map.class;
}
@Override
public Class<?> getOutClassType() {
return Set.class;
}
接入應用服務啟動後,可訪問/refreshUI 可查看應用在集群中為自動配置的實例,並顯示當前實例中變數值參數。key為實例變數名。
6 總結
1、支持jdk動態代理的實例對象和cglib代理對象的參數動態配置
2、支持定時刷新配置
3、直接查看和驗證應用集群中實例變數是否一致