spark 基礎開發 Tips總結

来源:https://www.cnblogs.com/ityuanmanito/archive/2018/11/12/9945447.html
-Advertisement-
Play Games

本篇博客主要是 sparksql 從初始開發註意的一些基本點以及力所能及的可優化部分的介紹: 所使用spark版本:2.0.0 scala版本:2.11.8 1. SparkSession的初始化: 註意點: a. spark.sql.warehouse.dir 需要顯示設置,否則會拋出 Excep ...


本篇博客主要是 sparksql 從初始開發註意的一些基本點以及力所能及的可優化部分的介紹:  

所使用spark版本:2.0.0       scala版本:2.11.8

1. SparkSession的初始化:

 

val sparkSession = SparkSession.builder().master("local[*]").appName("AppName").config("spark.sql.warehouse.dir", "file:///D:/XXXX/XXXX/spark-warehouse").config("spark.sql.shuffle.partitions", 50).getOrCreate()

  

 註意點:

             a.  spark.sql.warehouse.dir 需要顯示設置,否則會拋出 Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:...   錯誤

             b. spark.sql.shuffle.partitions  指定 Shuffle 時 Partition 個數,也即 Reducer 個數。根據業務數據量測試調整最佳結果

                Partition 個數不宜設置過大:

               Reducer(代指 Spark Shuffle 過程中執行 Shuffle Read 的 Task) 個數過多,每個 Reducer 處理的數據量過小。大量小 Task 造成不必要的 Task 調度開銷與可能的資源調度開銷(如果開啟了 Dynamic Allocation)

            Reducer 個數過大,如果 Reducer 直接寫 HDFS 會生成大量小文件,從而造成大量 addBlock RPC,Name node 可能成為瓶頸,並影響其它使用 HDFS 的應用

            過多 Reducer 寫小文件,會造成後面讀取這些小文件時產生大量 getBlock RPC,對 Name node 產生衝擊

               Partition 個數不宜設置過小:

            每個 Reducer 處理的數據量太大,Spill 到磁碟開銷增大

            Reducer GC 時間增長

            Reducer 如果寫 HDFS,每個 Reducer 寫入數據量較大,無法充分發揮並行處理優勢

2. 將非結構化數據轉換為結構化數據DataFrame(本人用的自定義模式): 

    val rdd= sparkSession.sparkContext.textFile(path, 250)  // 預設split為2

    val schemaString = "time hour lic"   //結構化數據的列名,可理解為關係型資料庫的列名

    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))   // 欄位名  欄位類型  是否可為空

    val schema = StructType(fields)      //上兩步組裝最終 createDataFrame 時需要的 schema

    val rowRDD = citySECRDD.map(_.split(",")).filter(attributes => attributes.length >= 6 && attributes(1).equals("2")&& attributes(0).split(" ").length > 1 && attributes(0).split(" ")(1).split(":").length > 1).map(attributes => {Row(attributes(0).trim,attributes(0).split(" "                   (1).split(":")(0).trim,attributes(2).trim,attributes(3).trim,attributes(4).trim,attributes(5).trim)})         //自定義一些過濾條件  以及組裝最終的 row類型的RDD

    val df= sparkSession.createDataFrame(rowRDD, schema)       //將rdd裝換成DataFrame

3. 兩種緩存使用方式:

    1)df.persist(StorageLevel.MEMORY_ONLY)     //後續如果需要反覆使用DF[DataFrame的簡稱],則就把此DF緩存起來                            
df.unpersist() //釋放緩存 常用的兩種序列化方式:MEMORY_ONLY->不加工在記憶體中存儲 MEMORY_ONLY_SER->在記憶體中序列化存儲(占用記憶體空間較小) 2)df.createOrReplaceTempView("table") sparkSession.sql("cache table table") // 以 sql 形式緩存DF
sparkSession.sql("uncache table table") //釋放緩存

4.spark整合Hbase快速批量插入

  將計算結果寫入Hbase:

      註意:1) 如果是帶有shuffle過程的,shuffle計算之前使用select()提出只需要的欄位然後再進行計算,因為shuffle特別耗費時間,寫磁碟的過程,所以要能少寫就少寫。

df.foreachPartition(partition => {

      val hconf = HBaseConfiguration.create();

      hconf.set(zkClientPort, zkClientPortValue) //zk 埠

      hconf.set(zkQuorum, zkQuorumValue) //zk 地址
      hconf.set(hbaseMaster, hbaseMasterValue) //hbase master
       val myTable = new HTable(hconf, TableName.valueOf(tableName))
       myTable.setAutoFlush(false, false) //關鍵點1
      myTable.setWriteBufferSize(5 * 1024 * 1024) //關鍵點2
      partition.foreach(x => {

      val column1 = x.getAs[String]("column1") //列1
      val column2 = x.getAs[String]("column2") //列2
      val column3 = x.getAs[Double]("column3") //列3
      val date = dateStr.replace("-", "") // 格式化後的日期

    val rowkey = MD5Hash.getMD5AsHex(Bytes.toBytes(column1+ date)) + Bytes.toBytes(hour)
    val put = new Put(Bytes.toBytes(rowkey))
    put.add("c1".getBytes(), "column1".getBytes(), licPlateNum.getBytes()) //第一列族 第一列 
    put.add("c1".getBytes(), "column2".getBytes(), hour.getBytes()) //第一列族 第二列
    put.add("c1".getBytes(), "column3".getBytes(), interval.toString.getBytes()) //第一列族 第三列
    put.add("c1".getBytes(), "date".getBytes(), date.getBytes()) //第一列族 第四列
    myTable.put(put)
     })
     myTable.flushCommits() //關鍵點3
    /*
    *關鍵點1_:將自動提交關閉,如果不關閉,每寫一條數據都會進行提交,是導入數據較慢的做主要因素。
     關鍵點2:設置緩存大小,當緩存大於設置值時,hbase會自動提交。此處可自己嘗試大小,一般對大數據量,設置為5M即可,本文設置為3M。
     關鍵點3:每一個分片結束後都進行flushCommits(),如果不執行,當hbase最後緩存小於上面設定值時,不會進行提交,導致數據丟失。
     註:此外如果想提高Spark寫數據如Hbase速度,可以增加Spark可用核數量。
    */

5. spark任務提交shell腳本:

spark-submit --jars /XXX/XXX/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar \
         --master yarn\
         --num-executors 200 \
         --conf "spark.driver.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \
         --conf "spark.executor.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \ 
         --conf spark.driver.cores=2 \
         --conf spark.driver.memory=10g \
         --conf spark.driver.maxResultSize=2g \
         --conf spark.executor.cores=6 \
         --conf spark.executor.memory=10g \
         --conf spark.shuffle.blockTransferService=nio \
         --conf spark.memory.fraction=0.8 \
         --conf spark.shuffle.memoryFraction=0.4 \               
         --conf spark.default.parallelism=1000 \
         --conf spark.sql.shuffle.partitions=400 \                     預設200,如果項目中代碼設置了此選項,則代碼設置級別優先,會覆蓋此處設置
         --conf spark.shuffle.consolidateFiles=true \
         --conf spark.shuffle.io.maxRetries=10 \
         --conf spark.scheduler.listenerbus.eventqueue.size=1000000 \
         --class XXXXX\                                                                項目啟動主類引用
         --name zzzz \
         /data/XXX/XXX-jar-with-dependencies.jar \                       項目jar包
        "參數1" "參數2" 

  

註: 紅色部分是Hbase需要的配置,同時需要在spark集群的spark-defaults.conf 裡面配置

        spark.driver.extraClassPath  和  spark.executor.extraClassPath   直指 hbase-protocol-0.96.1.1-cdh5.0.2.jar 路徑

先寫到這裡吧,後續會繼續完善通過sparkUi 優化細節以及提交spark任務的時候 如何分配 executor.cores 和 executor.memory。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1 虛擬運行時間(今日內容提醒) 1.1 虛擬運行時間的引入 CFS為了實現公平,必須懲罰當前正在運行的進程,以使那些正在等待的進程下次被調度。 具體實現時,CFS通過每個進程的虛擬運行時間(vruntime)來衡量哪個進程最值得被調度。 CFS中的就緒隊列是一棵以vruntime為鍵值的紅黑樹,虛 ...
  • 有一段時間沒有寫學習心得了;現在開始加油,再接再勵。 從最基礎的開始 1.安裝centOS7.3之後設置IP地址。一般linux的系統都是作為伺服器的系統來使用,伺服器的屬性註定了他的IP不能隨意的更變,所以需要設置一個固定的IP地址。 一般centos系統安裝完成後,IP都是通過dhcp來獲得的。 ...
  • LVM邏輯捲管理器 為什麼要使用邏輯捲? 邏輯捲管理器是Linux系統用於對硬碟分區進行管理的一種機制,為瞭解決硬碟設備在創建分區後不易修改分區大小的缺陷。儘管對傳統的硬碟分區進行強制擴容或縮容從理論上講是可行的。但是卻可能造成數據的丟失。LVM技術是在硬碟分區和文件系統之間添加了一個邏輯層,它提供 ...
  • whereis命令用於查找文件。 該指令會在特定目錄中查找符合條件的文件。這些文件應屬於原始代碼、二進位文件,或是幫助文件。 該指令只能用於查找二進位文件、源代碼文件和man手冊頁,一般文件的定位需使用locate命令。 一.命令格式: whereis [ bfmsu][ B ...][ M ... ...
  • 一、命令介紹 mkdir 命令用於創建空白目錄格式為“mkdir [選項] 目錄”, 除了能夠創建單個空白目錄,還能結合 -p 參數來遞歸創建具有嵌套層疊關係的文件目錄。 二、實例 使用mkdir命令在當前目錄創建一個名為new的文件目錄 執行 mkdir new 可以看到文件目錄 new 已經創建 ...
  • 任務二 表數據的插入、修改及刪除 @[toc] | 班級 | 姓名 | | | | |軟體工程16 9班 | 洪燕妮| 【實訓目的與要求】 1、利用MySQL命令行視窗進行增、刪、改數據操作; 2、利用界面工具進行增、刪、改數據操作。 【實訓原理】 MySQL的增、刪、改數據操作命令。 【實訓步驟】 ...
  • 一.概述 整數集合(intset)是集合鍵的底層實現之一, 當一個集合只包含整數值元素,並且這個集合元素數量不多時, Redis就會使用整數集合作為集合鍵的底層實現。下麵創建一個只包含5個元素的集合鍵,並且集合中所有元素都是整數值,那麼這個集合鍵的底層實現就會是整數集合。 接著添加非整數值,集合鍵的 ...
  • 用戶定義函數定義 與編程語言中的函數類似,SQL Server 用戶定義函數是接受參數、執行操作(例如複雜計算)並將操作結果以值的形式返回的常式。 返回值可以是單個標量值或結果集。 用戶定義函數準則 在函數中,將會區別處理導致語句被取消並繼續執行模塊(如觸發器或存儲過程)中的下一個語句的 Trans ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...