說明 使用 VLD 記憶體泄漏檢測工具輔助開發時整理的學習筆記。同系列文章目錄可見 《記憶體泄漏檢測工具》目錄 1. 使用方式 在 VS 中使用 VLD 的方法可以查看另外一篇博客:在 VS 2015 中使用 VLD。 2. 輸出報告 在 VS 中使用 VLD 時的輸出報告,與在 QT 中使用時是一致的 ...
前言
這周的主要時間花在Flink上面,做了一個簡單的從文本文件中讀取數據,然後存入資料庫的例子,能夠正常的實現功能,但是遇到個問題,我有四台機器,自己搭建了一個standalone的集群,不論我把並行度設置多少,跑起來的耗時都非常接近,實在是百思不得其解。機器多似乎並不能幫助它。 把過程記錄在此,看後面隨著學習的深入能不能解答出這個問題。
嘗試過的修複方法
集群搭建
出現這個問題後,我從集群的角度來進行了些修改,
1,機器是2核的,slots被設置成了6,那我就有點懷疑是這個設置問題,因為其實只有2核,設置的多了,反而存在搶占資源,導致運行達不到效果,改成2後效果一樣,沒有改進。這個參數在
taskmanager.numberOfTaskSlots: 2
2,調整記憶體, taskmanager 從2G調整為4G, 效果也沒有變化。
taskmanager.memory.process.size: 4000m
這裡說下這個記憶體,我們設置的是總的Memory,也就是這個Total Process Memory。
剔除掉些比較固定的Memory,剩下的大頭就是這個Task Heap 和 Managed Memory。
所以我們調整大小後,它兩個也就相應的增加了。 我查了下這兩個,可以理解為堆記憶體和堆外記憶體,
一個是存放我們程式的對象,會被垃圾回收器回收;一個是堆外記憶體,比如RockDB 和 緩存 sort,hash 等的中間結果。
程式方面修改
最開始的時候我把保存資料庫操作寫在MapFunction裡面,後來改到SinkFunction裡面。
SinkFunction裡面保存資料庫的方法也進行了反覆修改,從開始使用Spring的JdbcTemplate,換成後來直接使用最原始JDBC。 而且還踩了一個坑,開始的時候用的註入的JdbcTemplate, 本地運行沒有問題,到了集群上面,發到別的機器的時候,註入的東西就是空的了。
換成原始的JDBC速度能提升不少, 我猜想這裡的原因是jdbctemplate做了些多餘的事情, JDBC打開一次,後面Invoke的時候就直接存了,效率要高些,所以速度上提升不少。
這裡把部分代碼貼出來, 在Open的時候就預載入好PreparedStatement, Invoke的時候直接傳參數,調用就可以了。
public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
private PreparedStatement updatePS;
private PreparedStatement insertPS;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
HikariDataSource dataSource = new HikariDataSource();
connection = getConnection(dataSource);
if(connection != null)
{
String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
updatePS = this.connection.prepareStatement(updateSQL);
String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
insertPS = this.connection.prepareStatement(insertSQL);
}
}
@Override
public void close() throws Exception {
super.close();
if (updatePS != null) {
updatePS.close();
}
if (insertPS != null) {
insertPS.close();
}
//關閉連接和釋放資源
if (connection != null) {
connection.close();
}
}
/**
* 每條數據的插入都要調用一次 invoke() 方法
*
* @param marketPrice
* @param context
* @throws Exception
*/
@Override
public void invoke(MarketPrice marketPrice, Context context) throws Exception {
log.info("start save for {}", marketPrice.getPerformanceId().toString() );
updatePS.setDouble(1,marketPrice.getOpenPrice());
updatePS.setDouble(2,marketPrice.getHighPrice());
updatePS.setDouble(3,marketPrice.getLowPrice());
updatePS.setDouble(4,marketPrice.getClosePrice());
updatePS.setString(5, marketPrice.getPerformanceId().toString());
updatePS.setInt(6, marketPrice.getPriceAsOfDate());
int result = updatePS.executeUpdate();
log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);
if(result == 0)
{
String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
insertPS = this.connection.prepareStatement(insertSQL);
insertPS.setString(1, marketPrice.getPerformanceId().toString());
insertPS.setInt(2, marketPrice.getPriceAsOfDate());
insertPS.setDouble(3,marketPrice.getOpenPrice());
insertPS.setDouble(4,marketPrice.getHighPrice());
insertPS.setDouble(5,marketPrice.getLowPrice());
insertPS.setDouble(6,marketPrice.getClosePrice());
result = insertPS.executeUpdate();
log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
}
}
}
總結
從多個方面去改進,結果發現還是一樣的,就是使用一臺機器和使用三台機器,時間上一樣的,再懷疑我只能懷疑是某台機器有問題,然後運行的時候,由最慢的機器決定了速度。 我在使用MapFunction的時候有觀察到,有的時候,某台機器已經處理上千條,而有的只處理了幾十條,到最後完成的時候,大家處理的數量又是很接近的。這樣能夠解釋為什麼機器多了,速度卻是一樣的。但是我沒有辦法找出哪台機器來。 我自己的本地運行,並行數設置的多,速度上面是有提升的,到了集群就碰到這樣的現象,後面看能不能解決它, 先記錄在此。