ElasticSearch - 批量更新bulk死鎖問題排查

来源:https://www.cnblogs.com/Jcloud/archive/2023/07/05/17527727.html
-Advertisement-
Play Games

由於商品變更MQ消息量巨大,為了提升更新ES的性能,防止出現MQ消息積壓問題,所以本系統使用了BulkProcessor進行批量非同步更新。 ...


一、問題系統介紹

  1. 監聽商品變更MQ消息,查詢商品最新的信息,調用BulkProcessor批量更新ES集群中的商品欄位信息;

  2. 由於商品數據非常多,所以將商品數據存儲到ES集群上,整個ES集群共劃分了256個分片,並根據商品的三級類目ID進行分片路由。

比如一個SKU的商品名稱發生變化,我們就會收到這個SKU的變更MQ消息,然後再去查詢商品介面,將商品的最新名稱查詢回來,再根據這個SKU的三級分類ID進行路由,找到對應的ES集群分片,然後更新商品名稱欄位信息。

由於商品變更MQ消息量巨大,為了提升更新ES的性能,防止出現MQ消息積壓問題,所以本系統使用了BulkProcessor進行批量非同步更新。

ES客戶端版本如下:

        <dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>6.5.3</version>
        </dependency>

BulkProcessor配置偽代碼如下:

        //在這裡調用build()方法構造bulkProcessor,在底層實際上是用了bulk的非同步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000條數據請求執行一次bulk
                .setBulkActions(1000)
                // 5mb的數據刷新一次bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 併發請求數量, 0不併發, 1併發允許執行
                .setConcurrentRequests(1)
                // 固定1s必須刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重試5次,間隔1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();

二、問題怎麼發現的

  1. 618大促開始後,由於商品變更MQ消息非常頻繁,MQ消息每天的消息量更是達到了日常的數倍,而且好多商品還變更了三級類目ID;

  2. 系統在更新這些三級類目ID發生變化的SKU商品信息時,根據修改後的三級類目ID路由後的分片更新商品信息時發生了錯誤,並且重試了5次,依然沒有成功;

  3. 因為在新路由的分片上沒有這個商品的索引信息,這些更新請求永遠也不會執行成功,系統的日誌文件中也記錄了大量的異常重試日誌。

  4. 商品變更MQ消息也開始出現了積壓報警,MQ消息的消費速度明顯趕不上生產速度。

  5. 觀察MQ消息消費者的UMP監控數據,發現消費性能很平穩,沒有明顯波動,但是調用次數會在系統消費MQ一段時間後出現斷崖式下降,由原來的每分鐘幾萬調用量逐漸下降到個位數。

  6. 在重啟應用後,系統又開始消費,UMP監控調用次數恢復到正常水平,但是系統運行一段時間後,還是會出現消費暫停問題,仿佛所有消費線程都被暫停了一樣。

三、排查問題的詳細過程

首先找一臺暫停消費MQ消息的容器,查看應用進程ID,使用jstack命令dump應用進程的整個線程堆棧信息,將導出的線程堆棧信息打包上傳到 https://fastthread.io/ 進行線程狀態分析。分析報告如下:

通過分析報告發現有124個處於BLOCKED狀態的線程,然後可以點擊查看各線程的詳細堆棧信息,堆棧信息如下:

連續查看多個線程的詳細堆棧信息,MQ消費線程都是在waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor),然後根據0x00000005eb781b10去搜索發現,這個對象鎖正在被另外一個線程占用,占用線程堆棧信息如下:

這個線程狀態此時正處於WAITING狀態,通過線程名稱發現,該線程應該是ES客戶端內部線程。正是該線程搶占了業務線程的鎖,然後又在等待其他條件觸發該線程執行,所以導致了所有的MQ消費業務線程一直無法獲取BulkProcessor內部的鎖,導致出現了消費暫停問題。

但是這個線程elasticsearch[scheduler][T#1]為啥不能執行? 它是什麼時候啟動的? 又有什麼作用?

就需要我們對BulkProcessor進行深入分析,由於BulkProcessor是通過builder模塊進行創建的,所以深入builder源碼,瞭解一下BulkProcessor的創建過程。

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }

內部創建了一個時間調度執行線程池,線程命名規則和上述持有鎖的線程名稱相似,具體代碼如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

最後在build方法內部執行了BulkProcessor的內部有參構造方法,在構造方法內部啟動了一個周期性執行的flushing任務,代碼如下

 BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
        if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            }
        }
    }

通過源代碼發現,該flush任務就是在創建BulkProcessor對象時設置的固定時間flush邏輯,當setFlushInterval方法參數生效,就會啟動一個後臺定時flush任務。flush間隔,由setFlushInterval方法參數定義。該flush任務在運行期間,也會搶占BulkProcessor對象鎖,搶到鎖後,才會執行execute方法。具體的方法調用關係源代碼如下:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

而上述代碼中的add方法,則是由MQ消費業務線程去調用,在該方法上同樣有一個synchronized關鍵字,所以消費MQ業務線程會和flush任務執行線程直接會存在鎖競爭關係。具體MQ消費業務線程調用偽代碼如下:

 @Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
            String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}  

通過以上對線程堆棧分析,發現所有的業務線程都在等待elasticsearch[scheduler][T#1]線程釋放BulkProcessor對象鎖,但是該線程確一直沒有釋放該對象鎖,從而出現了業務線程的死鎖問題。

結合應用日誌文件中出現的大量異常重試日誌,可能與BulkProcessor的異常重試策略有關,然後進一步瞭解BulkProcessor的異常重試代碼邏輯。由於業務線程中提交BulkRequest請求都統一提交到了BulkRequestHandler對象中的execute方法內部進行處理,代碼如下:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
            semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

BulkRequestHandler通過構造方法初始化了一個Retry任務對象,該對象中也傳入了一個Scheduler,且該對象和flush任務中傳入的是同一個線程池,該線程池內部只維護了一個固定線程。而execute方法首先會先根據Semaphore來控制併發執行數量,該併發數量在構建BulkProcessor時通過參數指定,通過上述配置發現該值配置為1。所以每次只允許一個線程執行該方法。即MQ消費業務線程和flush任務線程,同一時間只能有一個線程可以執行。然後下麵在瞭解一下重試任務是如何執行的,具體看如下代碼:

 public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
                            ActionListener<BulkResponse> listener) {
        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }

RetryHandler內部會執行提交bulkRequest請求,同時也會監聽bulkRequest執行異常狀態,然後執行任務重試邏輯,重試代碼如下:

private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }

RetryHandler將執行失敗的bulk請求重新交給了內部scheduler線程池去執行,通過以上代碼瞭解,該線程池內部只維護了一個固定線程,同時該線程池可能還會被另一個flush任務去占用執行。所以如果重試邏輯正在執行的時候,此時線程池內的唯一線程正在執行flush任務,則會阻塞重試邏輯執行,重試邏輯不能執行完成,則不會釋放Semaphore,但是由於併發數量配置的是1,所以flush任務線程需要等待其他線程釋放一個Semaphore許可後才能繼續執行。所以此處形成了迴圈等待,導致Semaphore和BulkProcessor對象鎖都無法釋放,從而使得所有的MQ消費業務線程都阻塞在獲取BulkProcessor鎖之前。

同時,在GitHub的ES客戶端源碼客戶端上也能搜索到類似問題,例如: https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印證了之前的猜想,就是因為bulk的不斷重試從而引發了BulkProcessor內部的死鎖問題。

四、如何解決問題

既然前邊已經瞭解到了問題產生的原因,所以就有瞭如下幾種解決方案:

1.升級ES客戶端版本到7.6正式版,後續版本通過將異常重試任務線程池和flush任務線程池進行了物理隔離,從而避免了線程池的競爭,但是需要考慮版本相容性。

2.由於該死鎖問題是由大量異常重試邏輯引起的,可以在不影響業務邏輯的情況取消重試邏輯,該方案可以不需要升級客戶端版本,但是需要評估業務影響,執行失敗的請求可以通過其他其他方式進行業務重試。

如有疏漏不妥之處,歡迎指正!

作者:京東零售 曹志飛

來源:京東雲開發者社區


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • # lvm邏輯捲 ## 前言 > raid磁碟陣列技術,提高硬碟的讀寫效率,以及數據的安全,raid的缺點在於: > 1.當你配置好了raid磁碟陣列組,容量的大小,已經是限定了,如果你存儲的業務非常多,磁碟容量不夠用的問題就會出現,你想要擴容磁碟的空間,就會非常麻煩。 > 2.不同的磁碟分區,相對 ...
  • > 本篇內容主要來源於自己學習的視頻,如有侵權,請聯繫刪除,謝謝。 ### 1、etcd讀請求概覽 etcd是典型的`讀多寫少`存儲,在我們實際業務場景中,讀一般占據2/3以上的請求。一個讀 請求從client通過`Round-robin(輪詢)`負載均衡演算法,選擇一個etcd server節點,發 ...
  • # Shuffle的深入理解 什麼是Shuffle,本意為洗牌,在數據處理領域裡面,意為將數打散。 問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎? ## Shuffle的概念 通過網路將數據傳輸到多台機器,數據被打散,但是有網路傳輸,不一定就有shuffle,Shuffl ...
  • 向量資料庫是一種特殊類型的資料庫,它可以存儲和處理向量數據。向量數據通常用於表示多維度的數據點,例如在機器學習和人工智慧中使用的數據。在向量資料庫中,數據被表示為向量,這些向量可以在多維空間中進行比較和搜索。 ...
  • 隨著業務的發展,[實時場景](https://www.dtstack.com/dtinsight/streamworks?src=szsm)在各個⾏業中變得越來越重要。⽆論是⾦融、電商還是物流,實時數據處理都成為了其中的關鍵環節。Flink 憑藉其強⼤的[流處理特性](https://www.dts ...
  • # 一. MySQL體繫結構 ![](https://tcs-devops.aliyuncs.com/storage/112v957e3962f4a8a6d4d8eb1a194d885fa0?Signature=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJBcHB ...
  • 摘要:隨著雲計算的興起和滲透,雲數倉成為了數倉技術演進的新階段,並且逐漸成為了眾多企業的共同選擇。 本文分享自華為雲社區《從GaussDB(DWS)的技術演進,看數據倉庫的積澱與新生》,作者: 華為雲頭條。 數據驅動著現代商業的發展 今天,無論在製造、零售、物流 還是在互聯網、金融等行業 數據都變得 ...
  • ## 一、問題發現 在一次開發中在sp中使用`MySQL PREPARE`以後,使用`match AGAINST`語句作為`prepare stmt`的參數後,發現執行第二遍call會導致資料庫crash,於是開始動手調查問題發生的原因。 > 註:本次使用的 MySQL 資料庫版本為最新的debug ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...