記一次Flink遇到性能瓶頸

来源:https://www.cnblogs.com/dk168/archive/2023/04/15/17322082.html
-Advertisement-
Play Games

說明 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。同系列文章目錄可見 《記憶體泄漏檢測工具》目錄 1. 使用方式 在 VS 中使用 VLD 的方法可以查看另外一篇博客:在 VS 2015 中使用 VLD。 2. 輸出報告 在 VS 中使用 VLD 時的輸出報告,與在 QT 中使用時是一致的 ...


前言

這周的主要時間花在Flink上面,做了一個簡單的從文本文件中讀取數據,然後存入資料庫的例子,能夠正常的實現功能,但是遇到個問題,我有四台機器,自己搭建了一個standalone的集群,不論我把並行度設置多少,跑起來的耗時都非常接近,實在是百思不得其解。機器多似乎並不能幫助它。 把過程記錄在此,看後面隨著學習的深入能不能解答出這個問題。
image

嘗試過的修複方法

集群搭建

出現這個問題後,我從集群的角度來進行了些修改,
1,機器是2核的,slots被設置成了6,那我就有點懷疑是這個設置問題,因為其實只有2核,設置的多了,反而存在搶占資源,導致運行達不到效果,改成2後效果一樣,沒有改進。這個參數在
taskmanager.numberOfTaskSlots: 2
2,調整記憶體, taskmanager 從2G調整為4G, 效果也沒有變化。
taskmanager.memory.process.size: 4000m
這裡說下這個記憶體,我們設置的是總的Memory,也就是這個Total Process Memory。
image
剔除掉些比較固定的Memory,剩下的大頭就是這個Task Heap 和 Managed Memory。
所以我們調整大小後,它兩個也就相應的增加了。 我查了下這兩個,可以理解為堆記憶體和堆外記憶體,
一個是存放我們程式的對象,會被垃圾回收器回收;一個是堆外記憶體,比如RockDB 和 緩存 sort,hash 等的中間結果。

程式方面修改

最開始的時候我把保存資料庫操作寫在MapFunction裡面,後來改到SinkFunction裡面。
SinkFunction裡面保存資料庫的方法也進行了反覆修改,從開始使用Spring的JdbcTemplate,換成後來直接使用最原始JDBC。 而且還踩了一個坑,開始的時候用的註入的JdbcTemplate, 本地運行沒有問題,到了集群上面,發到別的機器的時候,註入的東西就是空的了。
換成原始的JDBC速度能提升不少, 我猜想這裡的原因是jdbctemplate做了些多餘的事情, JDBC打開一次,後面Invoke的時候就直接存了,效率要高些,所以速度上提升不少。
這裡把部分代碼貼出來, 在Open的時候就預載入好PreparedStatement, Invoke的時候直接傳參數,調用就可以了。

public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
    private PreparedStatement updatePS;
    private PreparedStatement insertPS;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HikariDataSource dataSource = new HikariDataSource();
        connection = getConnection(dataSource);
        if(connection != null)
        {
            String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
            updatePS = this.connection.prepareStatement(updateSQL);

            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (updatePS != null) {
            updatePS.close();
        }
        if (insertPS != null) {
            insertPS.close();
        }
        //關閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }

    }

    /**
     * 每條數據的插入都要調用一次 invoke() 方法
     *
     * @param marketPrice
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MarketPrice marketPrice, Context context) throws Exception {

        log.info("start save for {}", marketPrice.getPerformanceId().toString() );

        updatePS.setDouble(1,marketPrice.getOpenPrice());
        updatePS.setDouble(2,marketPrice.getHighPrice());
        updatePS.setDouble(3,marketPrice.getLowPrice());
        updatePS.setDouble(4,marketPrice.getClosePrice());
        updatePS.setString(5, marketPrice.getPerformanceId().toString());
        updatePS.setInt(6, marketPrice.getPriceAsOfDate());
        int result = updatePS.executeUpdate();


        log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);

        if(result == 0)
        {
            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
            insertPS.setString(1, marketPrice.getPerformanceId().toString());
            insertPS.setInt(2, marketPrice.getPriceAsOfDate());
            insertPS.setDouble(3,marketPrice.getOpenPrice());
            insertPS.setDouble(4,marketPrice.getHighPrice());
            insertPS.setDouble(5,marketPrice.getLowPrice());
            insertPS.setDouble(6,marketPrice.getClosePrice());

            result = insertPS.executeUpdate();
            log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
        }
    }

}

總結

從多個方面去改進,結果發現還是一樣的,就是使用一臺機器和使用三台機器,時間上一樣的,再懷疑我只能懷疑是某台機器有問題,然後運行的時候,由最慢的機器決定了速度。 我在使用MapFunction的時候有觀察到,有的時候,某台機器已經處理上千條,而有的只處理了幾十條,到最後完成的時候,大家處理的數量又是很接近的。這樣能夠解釋為什麼機器多了,速度卻是一樣的。但是我沒有辦法找出哪台機器來。 我自己的本地運行,並行數設置的多,速度上面是有提升的,到了集群就碰到這樣的現象,後面看能不能解決它, 先記錄在此。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 哈嘍大家好,我是鹹魚。今天跟大家分享一個關於正則表達式的案例,希望能夠對你有所幫助 案例現象 前幾天有一個小伙伴在群里求助,說他這個 shell 腳本有問題,讓大家幫忙看看 可以看到,這個腳本首先將目標文本文件的名字當作該腳本的第一個參數($1)傳遞進去,然後查看這個文本文件的內容(cat $1), ...
  • 簡介:本文主要介紹ubuntu20.04容器中搭建xfce遠程桌面、C++、Go環境、容器內docker操作配置、zsh配置 一、創建容器 1、創建容器 docker pull ubuntu:20.04docker run -itd --privileged --name=my-desktop--u ...
  • Redis入門 1.初始Redis 1.1認識NoSQL | | SQL(關係型資料庫) | NoSQL(非關係型資料庫) | | | | | | 數據結構 | 結構化(Structured) | 非結構化 | | 數據關聯 | 關聯的(Relational) | 無關聯的 | | 查詢方式 | S ...
  • 1.背景描述 2020年團隊決定對elasticsearch升級。es(elasticsearch縮寫,下同)當前版本為0.9x,升級到5.x版本。es在本公司承載三個部分的業務,站內查詢,訂單數據統計,elk日誌分析。 對於站內查詢和訂單數據統計,當前業務架構是 mysql -> canal -> ...
  • Mysql 中,為什麼 WHERE 使用別名會報錯,而 ORDER BY 不會報錯? 我們先對salary * 12 命名一個別名annual_sal SELECT employee_id,salary,salary * 12 annual_sal FROM employees ORDER BY a ...
  • 1.車系頁佈局渲染現狀 車系頁是重要的車系信息頁面,更新迭代多年,頁面佈局不斷變化,xml佈局文件越寫越複雜。 獲取車系頁佈局文件耗時: startTime = System.currentTimeMillis(); setContentView(R.layout.car_series_revisi ...
  • “我苦心鍛煉了三年,我變禿了,也變強了。” —— 琦玉老師 0x00 大綱 0x01 前言 四個月前,我在《你是來找茬的吧?對自己的博客進行調優》一文中探討了以博客的使用者而不是開發者身份去進行優化,究竟能做到何種程度的問題。當時以 Edge 瀏覽器的開發者工具里的 lighthouse 評分和載入 ...
  • #例子 星巴茲是以擴張速度最快而聞名的咖啡連鎖店。因為擴張速度實在太快,他們著急更新訂單系統,來匹配他們的飲料供應要求。 ##實現1 繼承 購買咖啡時,也可以要求其中加入各種調料,例如:蒸奶,豆漿 很明顯,星巴茲為自己製造了一個維護噩夢,如果牛奶的價錢上揚,怎麼辦?新增一種焦糖調料風味時,怎麼辦 調 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...