MongoDB向ES同步數據延遲越來越大,有的已經超過10個小時,造成客戶新加入的用戶無法被搜索出來。由於在系統中ES類似於數倉,很多統計和第三方接系統都需要從ES獲取數據,所以也影響了一些其他依賴ES數據的功能和業務。 ...
接到現場報告,MongoDB向ES同步數據延遲越來越大,有的已經超過10個小時,造成客戶新加入的用戶無法被搜索出來。由於在系統中ES類似於數倉,很多統計和第三方接系統都需要從ES獲取數據,所以也影響了一些其他依賴ES數據的功能和業務。
架構簡圖
tomcat------日誌數據----->logstash-------日誌數據--->| E S
mongodb---業務數據--->mongoshake---業務數據 -->| 集群
日誌通過logstash同步到ES,業務數據通過mongoshake自己實現的ES推送組件推送到ES。ES 5台構成一個集群。
問題現象
- 延遲只發生在第二條通路,第一條通路雖然也有較大併發,但不發生延遲;
- 且延遲在業務峰期才會出現,非高峰期延遲不嚴重或沒有延遲;
- 通過上面的描述可以確定此延遲於業務相關,且與mongoshake推送的實現方式相關;
初步分析
這個結構在兩個異地機房各有一套,中間通過同步機制同步。由於業務數據是從本地mongodb同步到本地ES發生延遲,首先排除是雙機房間傳輸帶寬問題。
會不會是ES在業務高峰期負載過高,造成推送延遲:可能性有,但是較低,因為如果是ES查詢並不慢,且如果是由此造成,logstash 推送也應該受到影響。對ES的性能監控也可以印證這個問題。
會不會是兩種數據業務的不同造成的性能差異:logstash的數據主要是日誌,插入為主,mongoshake的數據是業務數據,涉及到對以前的數據進行修改:確實有可能,但比較起來延遲不應該有如此之大,一個秒級延遲,一個天級延遲,還是首先考慮實現機制問題。
同步機制
mongoshake向ES同步機制,是將需要在ES存放的幾張表的oplog在ES回放,此程式由我們的開發人員擴展的mongoshake ES組件完成。
oplog ----- mongoshake----- oplog replay----->ES
聯合高峰期延遲增加的現象,可以猜測高峰期業務數據操作造成的oplog有大量增加,由於mongoshake本身(除非修改源碼)只能篩選表,不能篩選哪些表的具體日誌,只要是這幾張包的oplog都會同步,所以造成延遲。到底是oplog過多需要篩選,還是同步能力太低需要改進,我們需要進一步查證。
推送能力統計
現場人員查看了一段時間的同步量,對現有機制的oplog處理及回放能力進行統計。
第一次統計:130秒同步 2186
第二次統計:180秒同步 2714
可見平均每秒能力不足20條日誌,肯定是過低。那麼客戶現場實際業務每秒到底要產生多少條數據?這個問題要查清楚,作為推送性能優化的底線。
實際業務oplog情況分析
對客戶業務能力進行統計,需要將一整天的oplog導出,oplog由於沒有索引,雖然可以直接通過find,並給出ts 查詢,但由於有40G數據,查詢及其緩慢 。所以我們選擇將數據導出到文本文件,進行分析
start=$(date +%s -d "2020-03-24 08:00:00")
end=$(date +%s -d "2020-03-24 10:00:00")
mongodump -h localhost -d local -c oplog.rs -q '{"ts":{$gt:Timestamp('$start',1),$lt:Timestamp('$end',1)}}' -o /home/backup
cd /home/backup/local
bsondump oplog.rs.bson > oplog.rs.txt
進一步對oplog進行分析,在vim中分別統計每個小時的日誌數量,可以得到下表:橫軸是北京時間24個小時,縱軸是oplog數量,其中灰色是oplog中需要同步到ES的
可以看出高峰期oplog大量增長,需要同步的日誌超過150000,平均每秒42條oplog需要同步。而處理能力不到20,所以高峰期一個小時的數據往往需要2-3個小時才能同步完成,且從8點開始,一直都下午18點,實際oplog產生都超過處理能力。
往下的優化方向,一個是減少日誌,一個是增加處理能力,高峰期每秒42條日誌已經不高,雖然可以優化,但可優化範圍有限。增加處理能力才是關鍵。
開發查看ES同步代碼,原有代碼使用逐條同步模式,同步一條,獲取一條,同時採用性能較低的腳本同步方式,現在使用批量處理(實現參照mongoshake 向mongodb同步的 direct writer實現。批量調用elastic Client 提供的bulk api進行操作)
bulkRequest := bw.client.Bulk()
for _, log := range oplogs {
......
bulkRequest.Add(elastic.NewBulkDeleteRequest().Index(index).Type(doc_type).Id(id))
}
bulkResponse, err := bulkRequest.Do(context.Background())
對update 中的unset(刪除欄位) 進行處理,因為unset執行有速度有瓶頸,所以根據實際情況直接改為將欄位置空;
for _, v := range unsets {
doc[v] = nil
}
效果
在進行上述操作後,單線程情況下數據處理的速度超過每秒1000條(未嚴格測試),新同步代碼幾分鐘就能同步一個小時的oplog,完全達到性能要求。
討論
oplog優化
由於ES同步性能大幅提高,所以可以不用繼續優化oplog,但是oplog可以反映關鍵業務對資料庫的訪問情況,特別是寫入,在mongodb replica set中只能在primary 節點完成,即使增加節點也無法分擔流量,所以對oplog的進一步分析依然必要。同時oplog中包含數量的大小,也對replicaset 的同步帶寬有影響,特別是出現跨機房同步的情況時。
所以更進一步,我們還對業務oplog進行了分析,此分析我們會在別的文章中討論。
多線程執行
註意這裡ES同步沒有使用多線程處理,主要是考慮業務數據多線程操作的事務性。要實現此種事務,需要對mongodb本身進行一定改造。對mongodb源碼改造實現雙向同步和多線程寫入會在其他文章中討論。