去年的時候寫過dubbo+zipkin調用鏈監控,最近看到zipkin2配合brave實現起來會比我之前的實現要簡單很多,因為brave將很多交互的內容都封裝起來了,不需要自己去寫具體的實現,比如如何去構建span,如何去上報數據。 收集器抽象 由於zipkin支持http以及kafka兩種方式上報 ...
去年的時候寫過dubbo+zipkin調用鏈監控,最近看到zipkin2配合brave實現起來會比我之前的實現要簡單很多,因為brave將很多交互的內容都封裝起來了,不需要自己去寫具體的實現,比如如何去構建span,如何去上報數據。
收集器抽象
由於zipkin支持http以及kafka兩種方式上報數據,所以在配置上需要做下抽象。
AbstractZipkinCollectorConfiguration
主要是針對下麵兩種收集方式的一些配置上的定義,最核心的是Sender介面的定義,http與kafka是兩類完全不同的實現。
public abstract Sender getSender();
其次是協助性的構造函數,主要是配合構建收集器所需要的一些參數。
- zipkinUrl
如果是http收集,那麼對應的是zipkin api功能變數名稱,如果是kafka,對應的是kafka集群的地址
- topic
僅在收集方式為kafka是有效,http時傳空值即可。
public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){
this.zipkinUrl=zipkinUrl;
this.serviceName=serviceName;
this.topic=topic;
this.tracing=this.tracing();
}
配置上報方式,這裡統一採用異常上傳,並且配置上報的超時時間。
protected AsyncReporter<Span> spanReporter() {
return AsyncReporter
.builder(getSender())
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);
}
下麵這兩方法,是配合應用構建span使用的。
註意那個sampler()方法,預設是什麼也不做的意思,我們要想看到數據就需要配置成Sampler.ALWAYS_SAMPLE,這樣才能真正將數據上報到zipkin伺服器。
protected Tracing tracing() {
this.tracing= Tracing
.newBuilder()
.localServiceName(this.serviceName)
.sampler(Sampler.ALWAYS_SAMPLE)
.spanReporter(spanReporter())
.build();
return this.tracing;
}
protected Tracing getTracing(){
return this.tracing;
}
HttpZipkinCollectorConfiguration
主要是實現getSender方法,可以借用OkHttpSender這個對象來快速構建,api版本採用v2。
public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) {
super(serviceName,zipkinUrl,null);
}
@Override
public Sender getSender() {
return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans");
}
}
OkHttpSender這個類需要引用這個包
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
<version>${zipkin-reporter2.version}</version>
</dependency>
KafkaZipkinCollectorConfiguration
同樣也是實現getSender方法
public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) {
super(serviceName,zipkinUrl,topic);
}
@Override
public Sender getSender() {
return KafkaSender
.newBuilder()
.bootstrapServers(super.getZipkinUrl())
.topic(super.getTopic())
.encoding(Encoding.JSON)
.build();
}
}
KafkaSender這個類需要引用這個包:
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-kafka11</artifactId>
<version>${zipkin-reporter2.version}</version>
</dependency>
收集器工廠
由於上面創建了兩個收集器配置類,使用時只能是其中之一,所以實際運行的實例需要根據配置來動態生成。ZipkinCollectorConfigurationFactory就是負責生成收集器實例的。
private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration;
@Autowired
public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){
if(Objects.equal("kafka", traceConfig.getZipkinSendType())){
zipkinCollectorConfiguration=new KafkaZipkinCollectorConfiguration(
traceConfig.getApplicationName(),
traceConfig.getZipkinUrl(),
traceConfig.getZipkinKafkaTopic());
}
else {
zipkinCollectorConfiguration = new HttpZipkinCollectorConfiguration(
traceConfig.getApplicationName(),
traceConfig.getZipkinUrl());
}
}
通過構建函數將我們的配置類TraceConfig註入進來,然後根據發送方式來構建實例。另外提供一個輔助函數:
public Tracing getTracing(){
return this.zipkinCollectorConfiguration.getTracing();
}
過濾器
在dubbo的過濾器中實現數據上傳的功能邏輯相對簡單,一般都在invoke方法執行前記錄數據,然後方法執行完成後再次記錄數據。這個邏輯不變,有變化的是數據上報的實現,上一個版本是通過發http請求實現需要編碼,現在可以直接借用brave所提供的span來幫助我們完成,有兩重要的方法:
- finish
方法源碼如下,在完成的時候會填寫上完成的時間並上報數據,這一般應用於同步調用場景。
public void finish(TraceContext context, long finishTimestamp) {
MutableSpan span = this.spanMap.remove(context);
if(span != null && !this.noop.get()) {
synchronized(span) {
span.finish(Long.valueOf(finishTimestamp));
this.reporter.report(span.toSpan());
}
}
}
- flush 與上面finish方法的不同點在於,在報數據時沒有完成時間,這應該是適用於一些非同步調用但不關心結果的場景,比如dubbo所提供的oneway方式調用。
public void flush(TraceContext context) {
MutableSpan span = this.spanMap.remove(context);
if(span != null && !this.noop.get()) {
synchronized(span) {
span.finish((Long)null);
this.reporter.report(span.toSpan());
}
}
}
消費者
做為消費方,有一個核心功能就是將traceId以及spanId傳遞到服務提供方,這裡還是通過dubbo提供的附加參數方式實現。
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if(!RpcTraceContext.getTraceConfig().isEnabled()){
return invoker.invoke(invocation);
}
ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
if(null==RpcTraceContext.getTraceId()){
RpcTraceContext.start();
RpcTraceContext.setTraceId(IdUtils.get());
RpcTraceContext.setParentId(null);
RpcTraceContext.setSpanId(IdUtils.get());
}
else {
RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
RpcTraceContext.setSpanId(IdUtils.get());
}
TraceContext traceContext= TraceContext.newBuilder()
.traceId(RpcTraceContext.getTraceId())
.parentId(RpcTraceContext.getParentId())
.spanId(RpcTraceContext.getSpanId())
.sampled(true)
.build();
Span span=tracer.toSpan(traceContext).start();
invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));
Result result = invoker.invoke(invocation);
span.finish();
return result;
}
提供者
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if(!RpcTraceContext.getTraceConfig().isEnabled()){
return invoker.invoke(invocation);
}
Map<String, String> attaches = invocation.getAttachments();
if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){
return invoker.invoke(invocation);
}
Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY));
Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY));
attaches.remove(RpcTraceContext.TRACE_ID_KEY);
attaches.remove(RpcTraceContext.SPAN_ID_KEY);
RpcTraceContext.start();
RpcTraceContext.setTraceId(traceId);
RpcTraceContext.setParentId(spanId);
RpcTraceContext.setSpanId(IdUtils.get());
ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();
TraceContext traceContext= TraceContext.newBuilder()
.traceId(RpcTraceContext.getTraceId())
.parentId(RpcTraceContext.getParentId())
.spanId(RpcTraceContext.getSpanId())
.sampled(true)
.build();
Span span = tracer.toSpan(traceContext).start();
Result result = invoker.invoke(invocation);
span.finish();
return result;
}
異常流程
上面無論是消費者的過濾器還是服務提供者的過濾器,均未考慮服務在調用invoker.invoke時出錯的場景,如果出錯,後面的span.finish方法將不會按預期執行,也就記錄不了信息。所以需要針對此問題做優化:可以在finally塊中執行finish方法。
try {
result = invoker.invoke(invocation);
}
finally {
span.finish();
}
消費者在調用服務時,非同步調用問題
上面過濾器中調用span.finish都是基於同步模式,而由於dubbo除了同步調用外還提供了兩種調用方式
-
非同步調用 通過callback機制的非同步
-
oneway
只發起請求並不等待結果的非同步調用,無callback一說
針對上面兩類非同步再加上同步調用,我們要想準確記錄服務真正的時間,需要在消費方的過濾器中做如下處理:
創建一個用於回調的處理類,它的主要目的是為了在回調成功時記錄時間,這裡無論是成功還是失敗。
private class AsyncSpanCallback implements ResponseCallback{
private Span span;
public AsyncSpanCallback(Span span){
this.span=span;
}
@Override
public void done(Object o) {
span.finish();
}
@Override
public void caught(Throwable throwable) {
span.finish();
}
}
再在調用invoke方法時,如果是oneway方式,則調用flush方法結果,如果是同步則直接調用finish方法,如果是非同步則在回調時調用finish方法。
Result result = null;
boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
result = invoker.invoke(invocation);
}
finally {
if(isOneway) {
span.flush();
}
else if(!isAsync) {
span.finish();
}
}
待完善問題
過濾器中生成span的方式應該有更好的方法,還沒有對brave做過多研究,後續想辦法再優化下。另外我測試的場景是consumer調用provider,provider內部再調用provider2,我測試時發現第三步調用傳遞的parentId好像有點小問題,後續需要再確認下。
代碼下載
https://github.com/jiangmin168168/jim-framework