Spark SQL join的三種實現方式

来源:https://www.cnblogs.com/duodushuduokanbao/archive/2018/11/05/9911256.html
-Advertisement-
Play Games

引言 join是SQL中的常用操作,良好的表結構能夠將數據分散到不同的表中,使其符合某種規範(mysql三大範式),可以最大程度的減少數據冗餘,更新容錯等,而建立表和表之間關係的最佳方式就是join操作。 對於Spark來說有3種Join的實現,每種Join對應的不同的應用場景(SparkSQL自動 ...


引言

join是SQL中的常用操作,良好的表結構能夠將數據分散到不同的表中,使其符合某種規範(mysql三大範式),可以最大程度的減少數據冗餘,更新容錯等,而建立表和表之間關係的最佳方式就是join操作。

對於Spark來說有3種Join的實現,每種Join對應的不同的應用場景(SparkSQL自動決策使用哪種實現範式):

  1.Broadcast Hash Join:適合一張很小的表和一張大表進行Join;

  2.Shuffle Hash Join:適合一張小表(比上一個大一點)和一張大表進行Join;

  2.Sort Merge Join:適合兩張大表進行Join;

前兩者都是基於Hash Join的,只不過Hash Join之前需要先shuffle還是先brocadcast。下麵詳細解釋一下這三種Join的具體原理。

 

Hash Join

先來看看這樣一條SQL語句:select * from order,item where item.id = order.i_id,參與join的兩張表是order和item,join key分別是item.id以及order.i_id。現在假設Join採用的是hash join演算法,整個過程會經歷三步:

  1.確定Build Table以及Probe Table:這個概念比較重要,Build Table會被構建成以join key為key的hash table,而Probe Table使用join key在這張hash table表中尋找符合條件的行,然後進行join鏈接。Build表和Probe表是Spark決定的。通常情況下,小表會被作為Build Table,較大的表會被作為Probe Table。 

  2.構建Hash Table:依次讀取Build Table(item)的數據,對於每一條數據根據Join Key(item.id)進行hash,hash到對應的bucket中(類似於HashMap的原理),最後會生成一張HashTable,HashTable會緩存在記憶體中,如果記憶體放不下會dump到磁碟中。

  3.匹配:生成Hash Table後,在依次掃描Probe Table(order)的數據,使用相同的hash函數(在spark中,實際上就是要使用相同的partitioner)在Hash Table中尋找hash(join key)相同的值,如果匹配成功就將兩者join在一起。

                                                                                            

基礎流程可以參考上圖,這裡有兩個問題需要關註:

  1.hash join性能如何?很顯然,hash join基本都只掃描兩表一次,可以認為O(a+b),較之最極端的是笛卡爾積運算O(a*b);

  2.為什麼Build Table選擇小表?道理很簡單,因為構建Hash Table時,最好可以把數據全部載入到記憶體中,因為這樣效率才最高,這也決定了hash join只適合於較小的表,如果是兩個較大的表的場景就不適用了。

 

上文說,hash join是傳統資料庫中的單機join演算法,在分散式環境在需要經過一定的分散式改造,說到底就是儘可能利用分散式計算資源進行並行計算,提高總體效率,hash join分散式改造一般有以下兩種方案:

  1.broadcast hash join:將其中一張較小的表通過廣播的方式,由driver發送到各個executor,大表正常被分成多個區,每個分區的數據和本地的廣播變數進行join(相當於每個executor上都有一份小表的數據,並且這份數據是在記憶體中的,過來的分區中的數據和這份數據進行join)。broadcast適用於表很小,可以直接被廣播的場景;

  2.shuffle hash join:一旦小表比較大,此時就不適合使用broadcast hash join了。這種情況下,可以對兩張表分別進行shuffle,將相同key的數據分到一個分區中,然後分區和分區之間進行join。相當於將兩張表都分成了若幹小份,小份和小份之間進行hash join,充分利用集群資源。

 

Broadcast Hash Join

大家都知道,在資料庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表,維度表一般指固定的、變動較少的表,例如聯繫人、物品種類,一般數據有限;而事實表一遍記錄流水,比如銷售清單等,通過隨著時間的增長不斷增長。

因為join操作是對兩個表中key相同的記錄進行連接,在SparkSQL中,對兩個表做join的最直接的方式就是先根據key進行分區,再在每個分區中把key相同的記錄拿出來做連接操作,但這樣不可避免的涉及到shuffle,而shuffle是spark中比較耗時的操作,我們應該儘可能的設計spark應用使其避免大量的shuffle操作。

Broadcast Hash Join的條件有以下幾個:

  1.被廣播的表需要小於spark.sql.autoBroadcastJoinThreshold所配置的信息,預設是10M;

  2.基表不能被廣播,比如left outer join時,只能廣播右表。

看起來廣播是一個比較理想的方案,但它有沒有缺點呢?缺點也是很明顯的,這個方案只能廣播較小的表,否則數據的冗餘傳輸就是遠大於shuffle的開銷;另外,廣播時需要被廣播的表collect到driver端,當頻繁的廣播出現時,對driver端的記憶體也是一個考驗。

 

如下圖所示,broadcast hash join可以分為兩步:

  1.broadcast階段:將小表廣播到所有的executor上,廣播的演算法有很多,最簡單的是先發給driver,driver再統一分發給所有的executor,要不就是基於bittorrete的p2p思路;

  2.hash join階段:在每個executor上執行 hash join,小表構建為hash table,大表的分區數據匹配hash table中的數據;

                                                                                          

 

Shuffle Hash Join

 當一側的表比較小時,我們可以選擇將其廣播出去以避免shuffle,提高性能。但因為被廣播的表首先被collect到driver端,然後被冗餘的發送給各個executor上,所以當表比較大是,採用broadcast join會對driver端和executor端造成較大的壓力。

我們可以通過將大表和小表都進行shuffle分區,然後對相同節點上的數據的分區應用hash join,即先將較小的表構建為hash table,然後遍歷較大的表,在hash table中尋找可以匹配的hash值,匹配成功進行join連接。這樣既在一定程度上減少了driver廣播表的壓力,也減少了executor端讀取整張廣播表的記憶體消耗。

 

Sshuffle Hash Join分為兩步:

  1.對兩張表分別按照join key進行重分區(分區函數相同的時候,相同的相同分區中的key一定是相同的),即shuffle,目的是為了讓相同join key的記錄分到對應的分區中;

  2.對對應分區中的數據進行join,此處先將小表分區構建為一個hash表,然後根據大表中記錄的join key的hash值拿來進行匹配,即每個節點山單獨執行hash演算法。

Shuffle Hash Join的條件有以下幾個:

1. 分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,預設是10M 

2. 基表不能被廣播,比如left outer join時,只能廣播右表

3. 一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小,此處為經驗值)

                                                                                   

看到這裡,可以初步總結出來如果兩張小表join可以直接使用單機版hash join;如果一張大表join一張極小表,可以選擇broadcast hash join演算法;而如果是一張大表join一張小表,則可以選擇shuffle hash join演算法;那如果是兩張大表進行join呢?

 

Sort Merge Join

上面介紹的方式只對於兩張表有一張是小表的情況適用,而對於兩張大表,但當兩個表都非常大時,顯然無論哪種都會對計算記憶體造成很大的壓力。這是因為join時兩者採取都是hash join,是將一側的數據完全載入到記憶體中,使用hash code取join key相等的記錄進行連接。

當兩個表都非常大時,SparkSQL採用了一種全新的方案來對錶進行Join,即Sort Merge Join。這種方式不用將一側數據全部載入後再進行hash join,但需要在join前將數據進行排序。

首先將兩張表按照join key進行重新shuffle,保證join key值相同的記錄會被分在相應的分區,分區後對每個分區內的數據進行排序,排序後再對相應的分區內的記錄進行連接。可以看出,無論分區有多大,Sort Merge Join都不用把一側的數據全部載入到記憶體中,而是即用即丟;因為兩個序列都有有序的,從頭遍歷,碰到key相同的就輸出,如果不同,左邊小就繼續取左邊,反之取右邊。從而大大提高了大數據量下sql join的穩定性。

 

SparkSQL對兩張大表join採用了全新的演算法-sort-merge join,如下圖所示,整個過程分為三個步驟:

                                                                                          

 

. shuffle階段:將兩張大表根據join key進行重新分區,兩張表數據會分佈到整個集群,以便分散式並行處理;

2. sort階段:對單個分區節點的兩表數據,分別進行排序;

3. merge階段:對排好序的兩張分區表數據執行join操作。join操作很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,否則取更小一邊,見下圖示意:

                                                                                                                                                    

 

 

參考:

https://www.cnblogs.com/0xcafedaddy/p/7614299.html

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 第一次寫博客,各位湊合著看吧(假裝有人看)。 我這裡使用的是centos7。 1、首先打開終端,查看有沒有安裝過MySQL: 若為空則說明沒有安裝過,若要刪除可用yum remove mysql命令。 2、下載mysql的repo源: 安裝mysql-community-release-el7-5. ...
  • 作者:天山老妖S 鏈接:http://blog.51cto.com/9291927 一、存儲過程簡介 1、存儲過程簡介 存儲過程是一組具有特定功能的SQl語句集組成的可編程的函數,經編譯創建並保存在資料庫中,用戶可通過指定存儲過程的名字並給定參數來調用執行。 存儲過程是資料庫管理中常用的技術之一,可 ...
  • 事務定義 事務是單個的工作單元。事務是在資料庫上按照一定的邏輯順序執行的任務序列,既可以由用戶手動執行,也可以由某種資料庫程式自動執行。 事務分類 自動提交事務 每條單獨的語句都是一個事務。 顯式事務 每個事務均以 BEGIN TRANSACTION 語句顯式開始,以 COMMIT 或 ROLLBA ...
  • 如果我們用成語來形容近幾年的大數據產業,也許最合適的就是:如火如荼! 從大量融資、大數據從業者薪資上漲、從研發到商業應用的技術,到2017年的大數據產業可以說已經贏得了全世界的關註。然而,當涉及到大數據時,很多人認為普通人根本無法進去。真的是這樣嗎?普通人只看招聘人員的巨額薪水嗎? 事實上,只要找到 ...
  • 歡迎大家前往 "騰訊雲+社區" ,獲取更多騰訊海量技術實踐乾貨哦~ 本文由 "騰訊雲資料庫 TencentDB" 發表於 "雲+社區專欄" 鄒鵬 ,騰訊高級工程師,騰訊雲資料庫Redis負責人,多年資料庫、網路安全研發經驗。在網路、計算、存儲、安全等領域有深入的研究和豐富的產品化經驗。 在Redis ...
  • 本文系列文章: ​ 使用Shell 操作 MongoDB的技巧 ​ MongoTemplate的使用技巧及其註意事項 敬請期待。 前言 最近公司想要做一個用戶行為數據的收集,最開始想用mysql來存儲後來發現這種方式對於不固定數據格式的保存存在局限性,也不利於查詢統計操作。所以衍生了使用mongod ...
  • Oracle視圖詳解 一. 視圖的定義 視圖(view),也稱虛表, 不占用物理空間,這個也是相對概念,因為視圖本身的定義語句還是要存儲在數據字典里的。視圖只有邏輯定義。每次使用的時候,只是重新執行SQL。 視圖是從一個或多個實際表中獲得的,這些表的數據存放在資料庫中。那些用於產生視圖的表叫做該視圖 ...
  • LevelDb有如下一些特點: 首先,LevelDb是一個持久化存儲的KV系統,和Redis這種記憶體型的KV系統不同,LevelDb不會像Redis一樣狂吃記憶體,而是將大部分數據存儲到磁碟上。 其次,LevleDb在存儲數據時,是根據記錄的key值有序存儲的,就是說相鄰的key值在存儲文件中是依次順 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...