dubbo+zipkin調用鏈監控(二)

来源:https://www.cnblogs.com/ASPNET2008/archive/2018/10/08/9757980.html
-Advertisement-
Play Games

去年的時候寫過dubbo+zipkin調用鏈監控,最近看到zipkin2配合brave實現起來會比我之前的實現要簡單很多,因為brave將很多交互的內容都封裝起來了,不需要自己去寫具體的實現,比如如何去構建span,如何去上報數據。 收集器抽象 由於zipkin支持http以及kafka兩種方式上報 ...


去年的時候寫過dubbo+zipkin調用鏈監控,最近看到zipkin2配合brave實現起來會比我之前的實現要簡單很多,因為brave將很多交互的內容都封裝起來了,不需要自己去寫具體的實現,比如如何去構建span,如何去上報數據。

收集器抽象

由於zipkin支持http以及kafka兩種方式上報數據,所以在配置上需要做下抽象。

AbstractZipkinCollectorConfiguration

主要是針對下麵兩種收集方式的一些配置上的定義,最核心的是Sender介面的定義,http與kafka是兩類完全不同的實現。

public abstract Sender getSender();

其次是協助性的構造函數,主要是配合構建收集器所需要的一些參數。

  • zipkinUrl

如果是http收集,那麼對應的是zipkin api功能變數名稱,如果是kafka,對應的是kafka集群的地址

  • topic

僅在收集方式為kafka是有效,http時傳空值即可。

public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){
    this.zipkinUrl=zipkinUrl;
    this.serviceName=serviceName;
    this.topic=topic;
    this.tracing=this.tracing();
}

配置上報方式,這裡統一採用異常上傳,並且配置上報的超時時間。

protected AsyncReporter<Span> spanReporter() {
    return AsyncReporter
            .builder(getSender())
            .closeTimeout(500, TimeUnit.MILLISECONDS)
            .build(SpanBytesEncoder.JSON_V2);
}

下麵這兩方法,是配合應用構建span使用的。

註意那個sampler()方法,預設是什麼也不做的意思,我們要想看到數據就需要配置成Sampler.ALWAYS_SAMPLE,這樣才能真正將數據上報到zipkin伺服器。

protected Tracing tracing() {
    this.tracing= Tracing
            .newBuilder()
            .localServiceName(this.serviceName)
            .sampler(Sampler.ALWAYS_SAMPLE)
            .spanReporter(spanReporter())
            .build();
    return this.tracing;
}

protected Tracing getTracing(){
    return this.tracing;
}

HttpZipkinCollectorConfiguration

主要是實現getSender方法,可以借用OkHttpSender這個對象來快速構建,api版本採用v2。

public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) {
        super(serviceName,zipkinUrl,null);
    }

    @Override
    public Sender getSender() {
        return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans");
    }
}

OkHttpSender這個類需要引用這個包

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

KafkaZipkinCollectorConfiguration

同樣也是實現getSender方法

public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) {
        super(serviceName,zipkinUrl,topic);
    }

    @Override
    public Sender getSender() {

        return KafkaSender
                .newBuilder()
                .bootstrapServers(super.getZipkinUrl())
                .topic(super.getTopic())
                .encoding(Encoding.JSON)
                .build();
    }
}

KafkaSender這個類需要引用這個包:

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-kafka11</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

收集器工廠

由於上面創建了兩個收集器配置類,使用時只能是其中之一,所以實際運行的實例需要根據配置來動態生成。ZipkinCollectorConfigurationFactory就是負責生成收集器實例的。

private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration;

@Autowired
public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){
    if(Objects.equal("kafka", traceConfig.getZipkinSendType())){
        zipkinCollectorConfiguration=new KafkaZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl(),
                traceConfig.getZipkinKafkaTopic());
    }
    else {
        zipkinCollectorConfiguration = new HttpZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl());
    }
}

通過構建函數將我們的配置類TraceConfig註入進來,然後根據發送方式來構建實例。另外提供一個輔助函數:

public Tracing getTracing(){
    return this.zipkinCollectorConfiguration.getTracing();
}

過濾器

在dubbo的過濾器中實現數據上傳的功能邏輯相對簡單,一般都在invoke方法執行前記錄數據,然後方法執行完成後再次記錄數據。這個邏輯不變,有變化的是數據上報的實現,上一個版本是通過發http請求實現需要編碼,現在可以直接借用brave所提供的span來幫助我們完成,有兩重要的方法:

  • finish

方法源碼如下,在完成的時候會填寫上完成的時間並上報數據,這一般應用於同步調用場景。

public void finish(TraceContext context, long finishTimestamp) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish(Long.valueOf(finishTimestamp));
            this.reporter.report(span.toSpan());
        }
    }
}
  • flush 與上面finish方法的不同點在於,在報數據時沒有完成時間,這應該是適用於一些非同步調用但不關心結果的場景,比如dubbo所提供的oneway方式調用。
public void flush(TraceContext context) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish((Long)null);
            this.reporter.report(span.toSpan());
        }
    }
}

消費者

做為消費方,有一個核心功能就是將traceId以及spanId傳遞到服務提供方,這裡還是通過dubbo提供的附加參數方式實現。

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if(!RpcTraceContext.getTraceConfig().isEnabled()){
        return invoker.invoke(invocation);
    }

    ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
            SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
    Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();

    if(null==RpcTraceContext.getTraceId()){
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(IdUtils.get());
        RpcTraceContext.setParentId(null);
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    else {
        RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    TraceContext traceContext= TraceContext.newBuilder()
            .traceId(RpcTraceContext.getTraceId())
            .parentId(RpcTraceContext.getParentId())
            .spanId(RpcTraceContext.getSpanId())
            .sampled(true)
            .build();

    Span span=tracer.toSpan(traceContext).start();

    invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
    invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));

    Result result = invoker.invoke(invocation);

    span.finish();

    return result;
}

提供者

@Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if(!RpcTraceContext.getTraceConfig().isEnabled()){
            return invoker.invoke(invocation);
        }

        Map<String, String> attaches = invocation.getAttachments();
        if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){
            return invoker.invoke(invocation);
        }

        Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY));
        Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY));

        attaches.remove(RpcTraceContext.TRACE_ID_KEY);
        attaches.remove(RpcTraceContext.SPAN_ID_KEY);
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(traceId);
        RpcTraceContext.setParentId(spanId);
        RpcTraceContext.setSpanId(IdUtils.get());

        ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
                SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
        Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();

        TraceContext traceContext= TraceContext.newBuilder()
                .traceId(RpcTraceContext.getTraceId())
                .parentId(RpcTraceContext.getParentId())
                .spanId(RpcTraceContext.getSpanId())
                .sampled(true)
                .build();
        Span span = tracer.toSpan(traceContext).start();

        Result result = invoker.invoke(invocation);

        span.finish();

        return result;

    }

異常流程

上面無論是消費者的過濾器還是服務提供者的過濾器,均未考慮服務在調用invoker.invoke時出錯的場景,如果出錯,後面的span.finish方法將不會按預期執行,也就記錄不了信息。所以需要針對此問題做優化:可以在finally塊中執行finish方法。

try {
    result = invoker.invoke(invocation);
}
finally {
    span.finish();
}

消費者在調用服務時,非同步調用問題

上面過濾器中調用span.finish都是基於同步模式,而由於dubbo除了同步調用外還提供了兩種調用方式

  • 非同步調用 通過callback機制的非同步

  • oneway

只發起請求並不等待結果的非同步調用,無callback一說

針對上面兩類非同步再加上同步調用,我們要想準確記錄服務真正的時間,需要在消費方的過濾器中做如下處理:

創建一個用於回調的處理類,它的主要目的是為了在回調成功時記錄時間,這裡無論是成功還是失敗。

private class AsyncSpanCallback implements ResponseCallback{

    private Span span;

    public AsyncSpanCallback(Span span){
        this.span=span;
    }

    @Override
    public void done(Object o) {
        span.finish();
    }

    @Override
    public void caught(Throwable throwable) {
        span.finish();
    }
}

再在調用invoke方法時,如果是oneway方式,則調用flush方法結果,如果是同步則直接調用finish方法,如果是非同步則在回調時調用finish方法。


Result result = null;
boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
    result = invoker.invoke(invocation);
}
finally {
    if(isOneway) {
        span.flush();
    }
    else if(!isAsync) {
        span.finish();
    }
}

待完善問題

過濾器中生成span的方式應該有更好的方法,還沒有對brave做過多研究,後續想辦法再優化下。另外我測試的場景是consumer調用provider,provider內部再調用provider2,我測試時發現第三步調用傳遞的parentId好像有點小問題,後續需要再確認下。

代碼下載

https://github.com/jiangmin168168/jim-framework


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • [科學上網]Node.js 種子下載器 慶祝 2018 國慶,製作了一個 的種子下載器。爬取頁面,根據頁面的鏈接,破解另外一個網站,下載種子文件,同時使用 模塊提高爬蟲的併發量。項目比較簡單,爬取頁面沒有使用任何爬蟲框架。 "源碼傳送門" 。 的安裝請看我的另外一篇文章, "Node.js 的多版本 ...
  • 1》OL標簽的改良 start type reversed:翻轉排序 2》datalist標簽自動補全的使用 3》progress標簽的使用:進度條 4》meter標簽的應用 5》details展開收縮標簽的使用-子標簽summary(自動帶有展開收縮的效果) 6》mark標簽的應用:高亮顯示文本 ...
  • 隨著flash的沒落,瀏覽器的原生能力的興起。在3D方面WebGL不管從功能還是性能方面都在逐漸加強。2D應用變為3D應用的需求也越來越強烈。 win10的畫圖板支持3D圖片,2d工具photoshop也開始逐步集成了3D工具。 下麵就基於WebGL技術探討一下現在的兩款3D框架。Threejs(h ...
  • 網上很多關於驗證小數的正則表達式,但是很多都不是百分百正確,所以我結合一些前輩的經驗,自己寫了一個。 驗證非0開頭的無限位整數和小數。整數支持無限位,小數點前支持無限位,小數點後最多保留兩位。 js代碼如下: var reg = /^(([^0][0-9]+|0)\.([0-9]{1,2})$)|^ ...
  • 近幾年,微服務架構在後端技術社區大紅大紫,它被認為是IT軟體架構的未來技術方向.我們如何借鑒後端微服務的思想來構建一個現代化前端應用? 在這裡我提供一個可以在產品中真正可以落地的前端微服務解決方案. 微服務化後端前後端對比 後端微服務化的優勢: 1. 複雜度可控: 體積小、複雜度低,每個微服務可由一 ...
  • 在Bootstrap fileinput中移除預覽文件時可以通過配置initialPreviewConfig: [ { url:'deletefile',key:fileid } ] 來同步刪除伺服器上的文件和記錄。但新上傳的文件則需要其他方式來同步刪除伺服器記錄。 在配置中遇到的一些問題,記錄一下 ...
  • 數據流轉 先上一張圖看清 Westore 怎麼解決小程式數據難以管理和維護的問題: 非純組件的話,可以直接省去 triggerEvent 的過程,直接修改 store.data 並且 update,形成縮減版單向數據流。 "Github: https://github.com/dntzhang/we ...
  • 想在黑暗中看清周圍,不可避免地要用到夜視儀。那麼如果是想在黑暗中拍照,又沒有閃光燈,如何才能排到清晰的照片?在CVPR 2018上,英特爾實驗室的Vladlen Koltun和陳啟峰帶領的團隊提出了一種在黑暗中快速成像的系統,效果非常贊。 在暗光下的圖像易受到低信噪比和低亮度的影響。短曝光的照片會出 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...