記一次Flink遇到性能瓶頸

来源:https://www.cnblogs.com/dk168/archive/2023/04/15/17322082.html
-Advertisement-
Play Games

說明 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。同系列文章目錄可見 《記憶體泄漏檢測工具》目錄 1. 使用方式 在 VS 中使用 VLD 的方法可以查看另外一篇博客:在 VS 2015 中使用 VLD。 2. 輸出報告 在 VS 中使用 VLD 時的輸出報告,與在 QT 中使用時是一致的 ...


前言

這周的主要時間花在Flink上面,做了一個簡單的從文本文件中讀取數據,然後存入資料庫的例子,能夠正常的實現功能,但是遇到個問題,我有四台機器,自己搭建了一個standalone的集群,不論我把並行度設置多少,跑起來的耗時都非常接近,實在是百思不得其解。機器多似乎並不能幫助它。 把過程記錄在此,看後面隨著學習的深入能不能解答出這個問題。
image

嘗試過的修複方法

集群搭建

出現這個問題後,我從集群的角度來進行了些修改,
1,機器是2核的,slots被設置成了6,那我就有點懷疑是這個設置問題,因為其實只有2核,設置的多了,反而存在搶占資源,導致運行達不到效果,改成2後效果一樣,沒有改進。這個參數在
taskmanager.numberOfTaskSlots: 2
2,調整記憶體, taskmanager 從2G調整為4G, 效果也沒有變化。
taskmanager.memory.process.size: 4000m
這裡說下這個記憶體,我們設置的是總的Memory,也就是這個Total Process Memory。
image
剔除掉些比較固定的Memory,剩下的大頭就是這個Task Heap 和 Managed Memory。
所以我們調整大小後,它兩個也就相應的增加了。 我查了下這兩個,可以理解為堆記憶體和堆外記憶體,
一個是存放我們程式的對象,會被垃圾回收器回收;一個是堆外記憶體,比如RockDB 和 緩存 sort,hash 等的中間結果。

程式方面修改

最開始的時候我把保存資料庫操作寫在MapFunction裡面,後來改到SinkFunction裡面。
SinkFunction裡面保存資料庫的方法也進行了反覆修改,從開始使用Spring的JdbcTemplate,換成後來直接使用最原始JDBC。 而且還踩了一個坑,開始的時候用的註入的JdbcTemplate, 本地運行沒有問題,到了集群上面,發到別的機器的時候,註入的東西就是空的了。
換成原始的JDBC速度能提升不少, 我猜想這裡的原因是jdbctemplate做了些多餘的事情, JDBC打開一次,後面Invoke的時候就直接存了,效率要高些,所以速度上提升不少。
這裡把部分代碼貼出來, 在Open的時候就預載入好PreparedStatement, Invoke的時候直接傳參數,調用就可以了。

public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
    private PreparedStatement updatePS;
    private PreparedStatement insertPS;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HikariDataSource dataSource = new HikariDataSource();
        connection = getConnection(dataSource);
        if(connection != null)
        {
            String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
            updatePS = this.connection.prepareStatement(updateSQL);

            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (updatePS != null) {
            updatePS.close();
        }
        if (insertPS != null) {
            insertPS.close();
        }
        //關閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }

    }

    /**
     * 每條數據的插入都要調用一次 invoke() 方法
     *
     * @param marketPrice
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MarketPrice marketPrice, Context context) throws Exception {

        log.info("start save for {}", marketPrice.getPerformanceId().toString() );

        updatePS.setDouble(1,marketPrice.getOpenPrice());
        updatePS.setDouble(2,marketPrice.getHighPrice());
        updatePS.setDouble(3,marketPrice.getLowPrice());
        updatePS.setDouble(4,marketPrice.getClosePrice());
        updatePS.setString(5, marketPrice.getPerformanceId().toString());
        updatePS.setInt(6, marketPrice.getPriceAsOfDate());
        int result = updatePS.executeUpdate();


        log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);

        if(result == 0)
        {
            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
            insertPS.setString(1, marketPrice.getPerformanceId().toString());
            insertPS.setInt(2, marketPrice.getPriceAsOfDate());
            insertPS.setDouble(3,marketPrice.getOpenPrice());
            insertPS.setDouble(4,marketPrice.getHighPrice());
            insertPS.setDouble(5,marketPrice.getLowPrice());
            insertPS.setDouble(6,marketPrice.getClosePrice());

            result = insertPS.executeUpdate();
            log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
        }
    }

}

總結

從多個方面去改進,結果發現還是一樣的,就是使用一臺機器和使用三台機器,時間上一樣的,再懷疑我只能懷疑是某台機器有問題,然後運行的時候,由最慢的機器決定了速度。 我在使用MapFunction的時候有觀察到,有的時候,某台機器已經處理上千條,而有的只處理了幾十條,到最後完成的時候,大家處理的數量又是很接近的。這樣能夠解釋為什麼機器多了,速度卻是一樣的。但是我沒有辦法找出哪台機器來。 我自己的本地運行,並行數設置的多,速度上面是有提升的,到了集群就碰到這樣的現象,後面看能不能解決它, 先記錄在此。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 哈嘍大家好,我是鹹魚。今天跟大家分享一個關於正則表達式的案例,希望能夠對你有所幫助 案例現象 前幾天有一個小伙伴在群里求助,說他這個 shell 腳本有問題,讓大家幫忙看看 可以看到,這個腳本首先將目標文本文件的名字當作該腳本的第一個參數($1)傳遞進去,然後查看這個文本文件的內容(cat $1), ...
  • 簡介:本文主要介紹ubuntu20.04容器中搭建xfce遠程桌面、C++、Go環境、容器內docker操作配置、zsh配置 一、創建容器 1、創建容器 docker pull ubuntu:20.04docker run -itd --privileged --name=my-desktop--u ...
  • Redis入門 1.初始Redis 1.1認識NoSQL | | SQL(關係型資料庫) | NoSQL(非關係型資料庫) | | | | | | 數據結構 | 結構化(Structured) | 非結構化 | | 數據關聯 | 關聯的(Relational) | 無關聯的 | | 查詢方式 | S ...
  • 1.背景描述 2020年團隊決定對elasticsearch升級。es(elasticsearch縮寫,下同)當前版本為0.9x,升級到5.x版本。es在本公司承載三個部分的業務,站內查詢,訂單數據統計,elk日誌分析。 對於站內查詢和訂單數據統計,當前業務架構是 mysql -> canal -> ...
  • Mysql 中,為什麼 WHERE 使用別名會報錯,而 ORDER BY 不會報錯? 我們先對salary * 12 命名一個別名annual_sal SELECT employee_id,salary,salary * 12 annual_sal FROM employees ORDER BY a ...
  • 1.車系頁佈局渲染現狀 車系頁是重要的車系信息頁面,更新迭代多年,頁面佈局不斷變化,xml佈局文件越寫越複雜。 獲取車系頁佈局文件耗時: startTime = System.currentTimeMillis(); setContentView(R.layout.car_series_revisi ...
  • “我苦心鍛煉了三年,我變禿了,也變強了。” —— 琦玉老師 0x00 大綱 0x01 前言 四個月前,我在《你是來找茬的吧?對自己的博客進行調優》一文中探討了以博客的使用者而不是開發者身份去進行優化,究竟能做到何種程度的問題。當時以 Edge 瀏覽器的開發者工具里的 lighthouse 評分和載入 ...
  • #例子 星巴茲是以擴張速度最快而聞名的咖啡連鎖店。因為擴張速度實在太快,他們著急更新訂單系統,來匹配他們的飲料供應要求。 ##實現1 繼承 購買咖啡時,也可以要求其中加入各種調料,例如:蒸奶,豆漿 很明顯,星巴茲為自己製造了一個維護噩夢,如果牛奶的價錢上揚,怎麼辦?新增一種焦糖調料風味時,怎麼辦 調 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...