Hadoop和Spark的Shuffer過程對比解析

来源:https://www.cnblogs.com/ernst/archive/2019/08/05/11305693.html
-Advertisement-
Play Games

Hadoop Shuffer     Hadoop 的shuffer主要分為兩個階段:Map、Reduce。 Map Shuffer:     這個階段發生在map階段之後,數據寫入記憶體之前,在數據寫入記憶體的過程就已經開 ...



Hadoop Shuffer

Hadoop 的shuffer主要分為兩個階段:Map、Reduce。

Map-Shuffer:

這個階段發生在map階段之後,數據寫入記憶體之前,在數據寫入記憶體的過程就已經開始shuffer,通過設置mapreduce.task.io.sort.mb的參數,可改變記憶體的大小,預設為100M。數據在寫入記憶體大於80%時,會發生溢寫spill)過程,將數據整體落地到磁碟,這個過程中預設調用快速排序演算法進行排序,否則調用用戶自定義的 combiner()方法,將數據按照排序的規則分佈在分區。然後進入mapshuffer最後一個階段merge,當磁碟中某一個分區的文件數量>=3個,自動觸發文件合併合併程式,這個過程將一個分區的所有數據進行排序合併成一個文件目錄(歸併演算法),以供reduce抓取。

(k,v,p) :一條數據,其中p是分區號。

Reduce-Shuffer:

​ 通過拷貝線程copy merge中的數據到reduce端,調用歸併演算法,生成一個個Iterator,再通過分組程式,將同一個key的分組放在一起,聚合為一個Iterator。

MR


Spark-Shuffer

Spark HashShuffle 是它以前的版本,現在1.6x 版本默應是 Sort-Based Shuffle。有分散式就一定會有 Shuffle,而且 HashShuffle 是 Spark以前的版本,亦即是 Sort-Based Shuffle 的前身,因為有 HashShuffle 的不足,才會有後續的 Sorted-Based Shuffle,以及現在的 Tungsten-Sort Shuffle。

Spark可以基於記憶體、也可以基於磁碟或者是第三方的儲存空間進行計算:

第一、Spark框架的架構設計和設計模式上是傾向於在記憶體中計算數據的。

第二、這也表達了人們對數據處理的一種美好的願望,就是希望計算數據的時候,數據就在記憶體中

Shuffle 是分散式系統的天敵

Spark 運行分成兩部分,第一部分是 Driver Program,裡面的核心是 SparkContext,它驅動著一個程式的開始,負責指揮,另外一部分是 Worker 節點上的 Task,它是實際運行任務的,當程式運行時,不間斷地由 Driver 與所在的進程進行交互,交互什麼,有幾點,第一、是讓你去乾什麼,第二、是具體告訴 Task 數據在那裡,例如說有三個 Stage,第二個 Task 要拿數據,它就會向 Driver 要數據,所以在整個工作的過程中,Executor 中的 Task 會不斷地與 Driver 進行溝通,這是一個網路傳輸的過程。

Spark架構

關於這種架構有幾點有用的註意事項:

  1. 每個應用程式都有自己的執行程式進程,這些進程在整個應用程式的持續時間內保持不變併在多個線程中運行任務。這樣可以在調度方(每個驅動程式調度自己的任務)和執行方(在不同JVM中運行的不同應用程式中的任務)之間隔離應用程式。但是,這也意味著無法在不將Spark應用程式(SparkContext實例)寫入外部存儲系統的情況下共用數據。
  2. Spark與底層集群管理器無關。只要它可以獲取執行程式進程,並且這些進程相互通信,即使在也支持其他應用程式的集群管理器(例如Mesos / YARN)上運行它也相對容易。
  3. 驅動程式必須在其生命周期內監聽並接受來自其執行程式的傳入連接(例如,請參閱網路配置部分中的spark.driver.port)。因此,驅動程式必須是來自工作節點的網路可定址的。
  4. 因為驅動程式在集群上調度任務,所以它應該靠近工作節點運行,最好是在同一區域網上運行。如果您想遠程向群集發送請求,最好向驅動程式打開RPC並讓它從附近提交操作,而不是遠離工作節點運行驅動程式。

在這個過程中一方面是 Driver 跟 Executor 進行網路傳輸,另一方面是Task要從 Driver 抓取其他上游的 Task 的數據結果,所以有這個過程中就不斷的產生網路結果。其中,下一個 Stage 向上一個 Stage 要數據這個過程,我們就稱之為 Shuffle。

每一個節點計算一部份數據,如果不對各個節點上獨立的部份進行匯聚的話,我們是計算不到最終的結果。這就是因為我們需要利用分散式來發揮它本身並行計算的能力,而後續又需要計算各節點上最終的結果,所以需要把數據匯聚集中,這就會導致 Shuffle,這也是說為什麼 Shuffle 是分散式不可避免的命運。

原始的 HashShuffle 機制

基於 Mapper 和 Reducer 理解的基礎上,當 Reducer 去抓取數據時,它的 Key 到底是怎麼分配的,核心思考點是:作為上游數據是怎麼去分配給下游數據的。在這張圖中你可以看到有4個 Task 在2個 Executors 上面,它們是並行運行的,Hash 本身有一套 Hash演算法,可以把數據的 Key 進行重新分類,每個 Task 對數據進行分類然後把它們不同類別的數據先寫到本地磁碟,然後再經過網路傳輸 Shuffle,把數據傳到下一個 Stage 進行匯聚。

HashShuffle 缺點:

  1. Shuffle前在磁碟上會產生海量的小文件,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)
  2. 記憶體不夠用,由於記憶體中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,記憶體不可承受,會出現 OOM 等問題。

優化後的 HashShuffle 機制

有4個Tasks,數據類別還是分成3種類型,因為Hash演算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer里,然後把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據)每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裡有4個Mapper Tasks,總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。Consoldiated Hash-Shuffle的優化有一個很大的好處就是假設現在有200個Mapper Tasks在同一個進程中,也只會產生3個本地小文件; 如果用原始的 Hash-Based Shuffle 的話,200個Mapper Tasks 會各自產生3個本地小文件,在一個進程已經產生了600個本地小文件。

這個優化後的 HashShuffle 叫 ConsolidatedShuffle,在實際生產環境下可以調以下參數:

spark.shuffle.consolidateFiles=true

Consolidated HashShuffle 缺點:

  1. 如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。

Shuffle影響Spark性能及調優點

Shuffle 不可以避免是因為在分散式系統中的基本點就是把一個很大的的任務/作業分成一百份或者是一千份,這一百份和一千份文件在不同的機器上獨自完成各自不同的部份,我們是針對整個作業要結果,所以在後面會進行匯聚,這個匯聚的過程的前一階段到後一階段以至網路傳輸的過程就叫 Shuffle。

在 Spark 中為了完成 Shuffle 的過程會把真正的一個作業劃分為不同的 Stage,這個Stage 的劃分是跟據依賴關係去決定的,Shuffle 是整個 Spark 中最消耗性能的一個地方。試試想想如果沒有 Shuffle 的話,Spark可以完成一個純記憶體式的操作。

reduceByKey,它會把每個 Key 對應的 Value 聚合成一個 value 然後生成新的 RDD。

因為在不同節點上我們要進行數據傳輸,數據在通過網路發送之前,要先存儲在記憶體中,記憶體達到一定的程度,它會寫到本地磁碟,(在以前 Spark 的版本它沒有Buffer 的限制,會不斷地寫入 Buffer 然後等記憶體滿了就寫入本地,現在的版本對 Buffer 多少設定了限制,以防止出現 OOM,減少了 IO)。Mapper 端會寫入記憶體 Buffer,這個便關乎到 GC 的問題,然後 Mapper端的 Block 要寫入本地,大量的磁碟與IO的操作和磁碟與網路IO的操作,這就構成了分散式的性能殺手。

如果要對最終計算結果進行排序的話,一般會都會進行 sortByKey,如果以最終結果來思考的話,可以認為是產生了一個很大很大的 partition,可以用 reduceByKey 的時候指定它的並行度,例如把 reduceByKey 的並行度變成為1,新 RDD 的數據切片就變成1,排序一般都會在很多節點上,如果把很多節點變成一個節點然後進行排序,有時候會取得更好的效果,因為數據就在一個節點上,技術層面來講就只需要在一個進程里進行排序。

可以在調用 reduceByKey()接著調用 mapPartition( );
也可以用 repartitionAndSortWithPartitions( );

還有一個地方就是數據傾斜,Shuffle 時會導政數據分佈不均衡。數據傾斜的問題會引申很多其他問題,比如,網路帶寬、各重硬體故障、記憶體過度消耗、文件掉失。因為 Shuffle 的過程中會產生大量的磁碟 IO、網路 IO、以及壓縮、解壓縮、序列化和反序列化等等

Shuffle可能面臨的問題,運行 Task 的時候才會產生 Shuffle (Shuffle 已經融化在 Spark 的運算元中)

  1. 幾千台或者是上萬台的機器進行匯聚計算,數據量會非常大,網路傳輸會很大
  2. 數據如何分類其實就是 partition,即如何 Partition、Hash 、Sort 、計算
  3. 負載均衡 (數據傾斜)
  4. 網路傳輸效率,需要壓縮或解壓縮之間做出權衡,序列化 和 反序列化也是要考慮的問題

具體的 Task 進行計算的時候盡一切最大可能使得數據具備 Process Locality 的特性,退而求其次是增加數據分片,減少每個 Task 處理的數據量**,基於Shuffle 和數據傾斜所導致的一系列問題,可以延伸出很多不同的調優點,比如說:

  • Mapper端的 Buffer 應該設置為多大呢?
  • Reducer端的 Buffer 應該設置為多大呢?如果 Reducer 太少的話,這會限制了抓取多少數據
  • 在數據傳輸的過程中是否有壓縮以及該用什麼方式去壓縮,默應是用 snappy 的壓縮方式。
  • 網路傳輸失敗重試的次數,每次重試之間間隔多少時間。

總結

因為想利用分散式的計算能力,所以要把數據分散到不同節點上運行,上游階段數據是並行運行的,下游階段要進行匯聚,所以出現Shuffle,如果下游分成三類,上游也需要每個Task把數據分成三類,雖然有可能有一類是沒有數據,這無所謂,只要在實際運行時按照這套規則就可以了,這就是最原始的 Shuffle 過程。

Hash-based Shuffle 預設Mapper 階段會為Reducer 階段的每一個Task單獨創建一個文件來保存該Task中要使用的數據,但是在一些情況下(例如說數據量非常龐大的情況) 會造成大量文件的隨機磁碟IO操作且會性成大量的Memory消耗(極易造成OOM)。

  • 原始的 Hash-Shuffle 所產生的小文件: Mapper 端 Task 的個數 x Reduce 端 Task 的數量
  • Consolidated Hash-Shuffle 所產生的小文件: CPU Cores 的個數 x Reduce 端 Task 的數量

Spark Shuffle 說到底都是離不開讀文件、寫文件、為了高效我們需要緩存,由於有很多不同的進程,就需要一個管理者。HashShuffle 適合的埸景是小數據的埸景,對小規模數據的處理效率會比排序後的 Shuffle 高。

區別在於HadoopShuffer是sort-based,spill記憶體大小是100M,Saprk是hash-based(hash-based故名思義也就是在Shuffle的過程中寫數據時不做排序操作,只是將數據根據Hash的結果,將各個Reduce分區的數據寫到各自的磁碟文件中),記憶體大小是32K,Hadoop的Shuffle過程是明顯的幾個階段:map(),spill,merge,shuffle,sort,reduce()等,是按照流程順次執行的,屬於push類型;但是,Spark不一樣,因為Spark的Shuffle過程是運算元驅動的,具有懶執行的特點,屬於pull類型。


參考

Spark性能調優- 第二章:徹底解密Spark的HashShuffle

Hadoop shuffer 和 Spark shuffer區別

Cluster Mode Overview







您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • step1: 安裝虛擬環境: export WORKON_HOME=$HOME/.virtualenvs #指定virtualenvwrapper環境的目錄 export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.6 #指定virtualenvwrapper ...
  • postfix 虛擬用戶 postfix + dovecot + openldap 製作虛擬郵件用戶 這裡使用的虛擬郵件用戶的方法是我自己研究的,可能會有不對或則不好的地方,望指出。由於之前已經寫過MariaDB作為資料庫的虛擬用戶,所以這裡會有很多地方會簡化配置和講解,如果沒有看懂可以看看這篇文章 ...
  • 一、查看登錄用戶信息 w [用戶名] 二、Who who 三、查詢當前登錄和過去登陸的用戶信息 last 四、查看所有用戶最後一次登錄時間 lastlog ...
  • Linux強制關掉其他ssh登錄的用戶 首先 用who命令查看登錄的iproot pts/0 162.16.16.155 14:30 0.00s 0.07s 0.05s wroot pts/1 162.16.16.155 14:30 12.00s 0.01s 0.01s -bash 然後who am ...
  • Ctrl+N 當前位置新視窗打開Ctrl+Enter 點擊的文件夾新視窗打開 ...
  • 前言 大概幾個月之前項目中用到事務,需要保證數據的強一致性,期間也用到了mysql的鎖,但當時對mysql的鎖機制只是管中窺豹,所以本文打算總結一下mysql的鎖機制。 本文主要論述關於mysql鎖機制,mysql版本為5.7,引擎為innodb,由於實際中關於innodb鎖相關的知識及加鎖方式很多 ...
  • 概述 以 Hortonworks Data Platform (HDP) 平臺為例 ,hadoop大數據平臺的安全機制包括以下兩個方面: 身份認證 即核實一個使用者的真實身份,一個使用者來使用大數據引擎平臺,這個使用者需要表明自己是誰,即提供自己的身份證明,大數據平臺需要檢驗這個證明,確定這個證明是 ...
  • title: redis login limitation <! more 利用 redis 實現登陸次數限制, 註解 + aop, 核心代碼很簡單. 基本思路 比如希望達到的要求是這樣: 在 1min 內登陸異常次數達到5次, 鎖定該用戶 1h 那麼登陸請求的參數中, 會有一個參數唯一標識一個 u ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...