本篇博客主要是 sparksql 從初始開發註意的一些基本點以及力所能及的可優化部分的介紹: 所使用spark版本:2.0.0 scala版本:2.11.8 1. SparkSession的初始化: 註意點: a. spark.sql.warehouse.dir 需要顯示設置,否則會拋出 Excep ...
本篇博客主要是 sparksql 從初始開發註意的一些基本點以及力所能及的可優化部分的介紹:
所使用spark版本:2.0.0 scala版本:2.11.8
1. SparkSession的初始化:
val sparkSession = SparkSession.builder().master("local[*]").appName("AppName").config("spark.sql.warehouse.dir", "file:///D:/XXXX/XXXX/spark-warehouse").config("spark.sql.shuffle.partitions", 50).getOrCreate()
註意點:
a. spark.sql.warehouse.dir 需要顯示設置,否則會拋出 Exception in thread "main" java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:... 錯誤
b. spark.sql.shuffle.partitions 指定 Shuffle 時 Partition 個數,也即 Reducer 個數。根據業務數據量測試調整最佳結果
Partition 個數不宜設置過大:
Reducer(代指 Spark Shuffle 過程中執行 Shuffle Read 的 Task) 個數過多,每個 Reducer 處理的數據量過小。大量小 Task 造成不必要的 Task 調度開銷與可能的資源調度開銷(如果開啟了 Dynamic Allocation)
Reducer 個數過大,如果 Reducer 直接寫 HDFS 會生成大量小文件,從而造成大量 addBlock RPC,Name node 可能成為瓶頸,並影響其它使用 HDFS 的應用
過多 Reducer 寫小文件,會造成後面讀取這些小文件時產生大量 getBlock RPC,對 Name node 產生衝擊
Partition 個數不宜設置過小:
每個 Reducer 處理的數據量太大,Spill 到磁碟開銷增大
Reducer GC 時間增長
Reducer 如果寫 HDFS,每個 Reducer 寫入數據量較大,無法充分發揮並行處理優勢
2. 將非結構化數據轉換為結構化數據DataFrame(本人用的自定義模式):
val rdd= sparkSession.sparkContext.textFile(path, 250) // 預設split為2 val schemaString = "time hour lic" //結構化數據的列名,可理解為關係型資料庫的列名 val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) // 欄位名 欄位類型 是否可為空 val schema = StructType(fields) //上兩步組裝最終 createDataFrame 時需要的 schema val rowRDD = citySECRDD.map(_.split(",")).filter(attributes => attributes.length >= 6 && attributes(1).equals("2")&& attributes(0).split(" ").length > 1 && attributes(0).split(" ")(1).split(":").length > 1).map(attributes => {Row(attributes(0).trim,attributes(0).split(" " (1).split(":")(0).trim,attributes(2).trim,attributes(3).trim,attributes(4).trim,attributes(5).trim)}) //自定義一些過濾條件 以及組裝最終的 row類型的RDD val df= sparkSession.createDataFrame(rowRDD, schema) //將rdd裝換成DataFrame
3. 兩種緩存使用方式:
1)df.persist(StorageLevel.MEMORY_ONLY) //後續如果需要反覆使用DF[DataFrame的簡稱],則就把此DF緩存起來
df.unpersist() //釋放緩存 常用的兩種序列化方式:MEMORY_ONLY->不加工在記憶體中存儲 MEMORY_ONLY_SER->在記憶體中序列化存儲(占用記憶體空間較小) 2)df.createOrReplaceTempView("table") sparkSession.sql("cache table table") // 以 sql 形式緩存DF
sparkSession.sql("uncache table table") //釋放緩存
4.spark整合Hbase快速批量插入
將計算結果寫入Hbase:
註意:1) 如果是帶有shuffle過程的,shuffle計算之前使用select()提出只需要的欄位然後再進行計算,因為shuffle特別耗費時間,寫磁碟的過程,所以要能少寫就少寫。
df.foreachPartition(partition => { val hconf = HBaseConfiguration.create(); hconf.set(zkClientPort, zkClientPortValue) //zk 埠 hconf.set(zkQuorum, zkQuorumValue) //zk 地址 hconf.set(hbaseMaster, hbaseMasterValue) //hbase master val myTable = new HTable(hconf, TableName.valueOf(tableName)) myTable.setAutoFlush(false, false) //關鍵點1 myTable.setWriteBufferSize(5 * 1024 * 1024) //關鍵點2 partition.foreach(x => { val column1 = x.getAs[String]("column1") //列1 val column2 = x.getAs[String]("column2") //列2 val column3 = x.getAs[Double]("column3") //列3 val date = dateStr.replace("-", "") // 格式化後的日期 val rowkey = MD5Hash.getMD5AsHex(Bytes.toBytes(column1+ date)) + Bytes.toBytes(hour) val put = new Put(Bytes.toBytes(rowkey)) put.add("c1".getBytes(), "column1".getBytes(), licPlateNum.getBytes()) //第一列族 第一列 put.add("c1".getBytes(), "column2".getBytes(), hour.getBytes()) //第一列族 第二列 put.add("c1".getBytes(), "column3".getBytes(), interval.toString.getBytes()) //第一列族 第三列 put.add("c1".getBytes(), "date".getBytes(), date.getBytes()) //第一列族 第四列 myTable.put(put) }) myTable.flushCommits() //關鍵點3 /* *關鍵點1_:將自動提交關閉,如果不關閉,每寫一條數據都會進行提交,是導入數據較慢的做主要因素。 關鍵點2:設置緩存大小,當緩存大於設置值時,hbase會自動提交。此處可自己嘗試大小,一般對大數據量,設置為5M即可,本文設置為3M。 關鍵點3:每一個分片結束後都進行flushCommits(),如果不執行,當hbase最後緩存小於上面設定值時,不會進行提交,導致數據丟失。 註:此外如果想提高Spark寫數據如Hbase速度,可以增加Spark可用核數量。 */
5. spark任務提交shell腳本:
spark-submit --jars /XXX/XXX/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar \ --master yarn\ --num-executors 200 \ --conf "spark.driver.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \ --conf "spark.executor.extraClassPath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \ --conf spark.driver.cores=2 \ --conf spark.driver.memory=10g \ --conf spark.driver.maxResultSize=2g \ --conf spark.executor.cores=6 \ --conf spark.executor.memory=10g \ --conf spark.shuffle.blockTransferService=nio \ --conf spark.memory.fraction=0.8 \ --conf spark.shuffle.memoryFraction=0.4 \ --conf spark.default.parallelism=1000 \ --conf spark.sql.shuffle.partitions=400 \ 預設200,如果項目中代碼設置了此選項,則代碼設置級別優先,會覆蓋此處設置 --conf spark.shuffle.consolidateFiles=true \ --conf spark.shuffle.io.maxRetries=10 \ --conf spark.scheduler.listenerbus.eventqueue.size=1000000 \ --class XXXXX\ 項目啟動主類引用 --name zzzz \ /data/XXX/XXX-jar-with-dependencies.jar \ 項目jar包 "參數1" "參數2"
註: 紅色部分是Hbase需要的配置,同時需要在spark集群的spark-defaults.conf 裡面配置
spark.driver.extraClassPath 和 spark.executor.extraClassPath 直指 hbase-protocol-0.96.1.1-cdh5.0.2.jar 路徑
先寫到這裡吧,後續會繼續完善通過sparkUi 優化細節以及提交spark任務的時候 如何分配 executor.cores 和 executor.memory。