Spark學習筆記4:數據讀取與保存

来源:http://www.cnblogs.com/caiyisen/archive/2017/09/16/7527459.html
-Advertisement-
Play Games

Spark對很多種文件格式的讀取和保存方式都很簡單。Spark會根據文件擴展名選擇對應的處理方式。 Spark支持的一些常見文件格式如下: 1、文本文件 使用文件路徑作為參數調用SparkContext中的textFile()函數,就可以讀取一個文本文件。也可以指定minPartitions控制分區 ...


Spark對很多種文件格式的讀取和保存方式都很簡單。Spark會根據文件擴展名選擇對應的處理方式。

Spark支持的一些常見文件格式如下:

 1、文本文件

   使用文件路徑作為參數調用SparkContext中的textFile()函數,就可以讀取一個文本文件。也可以指定minPartitions控制分區數。傳遞目錄作為參數,會把目錄中的各部分都讀取到RDD中。例如:

val input = sc.textFile("E:\\share\\new\\chapter5")
input.foreach(println)

 chapter目錄有三個txt文件,內容如下:

 

輸出結果:

用SparkContext.wholeTextFiles()也可以處理多個文件,該方法返回一個pair RDD,其中鍵是輸入文件的文件名。

例如:

    val input = sc.wholeTextFiles("E:\\share\\new\\chapter5")
    input.foreach(println)

  輸出結果:

保存文本文件用saveAsTextFile(outputFile)

  •  JSON

JSON是一種使用較廣的半結構化數據格式,這裡使用json4s來解析JSON文件。

如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization

object TestJson {

  case class Person(name:String,age:Int)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("JSON")
    val sc = new SparkContext(conf)
    implicit val formats = Serialization.formats(ShortTypeHints(List()))
    val input = sc.textFile("E:\\share\\new\\test.json")
    input.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.age)})

  }

}

 json文件內容:

輸出結果:

保存JSON文件用saveASTextFile(outputFile)即可

如下:

val datasave = input.map { myrecord =>
      implicit val formats = DefaultFormats
      val jsonObj = parse(myrecord)
      jsonObj.extract[Person]
    }
datasave.saveAsTextFile("E:\\share\\spark\\savejson")

輸出結果:

  • CSV文件

 讀取CSV文件和讀取JSON數據相似,都需要先把文件當作普通文本文件來讀取數據,再對數據進行處理。

如下:

import org.apache.spark.{SparkConf, SparkContext}
import java.io.StringReader

import au.com.bytecode.opencsv.CSVReader

object DataReadAndSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("CSV")
    val sc = new SparkContext(conf)

    val input = sc.textFile("E:\\share\\spark\\test.csv")
    input.foreach(println)
    val result = input.map{
      line =>
        val reader = new CSVReader(new StringReader(line))
        reader.readNext()
    }
    for(res <- result){
      for(r <- res){
        println(r)
      }
    }
  }

}

test.csv內容:

輸出結果:

 保存csv

如下:

val inputRDD = sc.parallelize(List(Person("Mike", "yes")))
        inputRDD.map(person  => List(person.name,person.favoriteAnimal).toArray)
        .mapPartitions { people =>
          val stringWriter = new StringWriter()
          val csvWriter = new CSVWriter(stringWriter)
          csvWriter.writeAll(people.toList)
          Iterator(stringWriter.toString)
        }.saveAsTextFile("E:\\share\\spark\\savecsv")

 

  • SequenceFile

SequenceFile是由沒有相對關係結構的鍵值對文件組成的常用Hadoop格式。是由實現Hadoop的Writable介面的元素組成,常見的數據類型以及它們對應的Writable類如下:

讀取SequenceFile

調用sequenceFile(path , keyClass , valueClass , minPartitions)

保存SequenceFile

調用saveAsSequenceFile(outputFile)

 

  • 對象文件

對象文件使用Java序列化寫出,允許存儲只包含值的RDD。對象文件通常用於Spark作業間的通信。

保存對象文件調用 saveAsObjectFile    讀取對象文件用SparkContext的objectFile()函數接受一個路徑,返回對應的RDD

 

  • Hadoop輸入輸出格式

 Spark可以與任何Hadoop支持的格式交互。

讀取其他Hadoop輸入格式,使用newAPIHadoopFile接收一個路徑以及三個類,第一個類是格式類,代表輸入格式,第二個類是鍵的類,最後一個類是值的類。

hadoopFile()函數用於使用舊的API實現的Hadoop輸入格式。

KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,可以用於從文本文件中讀取鍵值對數據。每一行都會被獨立處理,鍵和值之間用製表符隔開。

 例子:

import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

object HadoopFile {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("hadoopfile").setMaster("local")
    val sc = new SparkContext(conf)

   
    val job = new Job()
    val data = sc.newAPIHadoopFile("E:\\share\\spark\\test.json" ,
      classOf[KeyValueTextInputFormat],
      classOf[Text],
      classOf[Text],
      job.getConfiguration)
    data.foreach(println)

    data.saveAsNewAPIHadoopFile(
      "E:\\share\\spark\\savehadoop",
      classOf[Text],
      classOf[Text],
      classOf[TextOutputFormat[Text,Text]],
      job.getConfiguration)

  }
}

  輸出結果:

讀取

保存

若使用舊API如下:

val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("E:\\share\\spark\\test.json
").map { case (x, y) => (x.toString, y.toString) } input.foreach(println)

  

  • 文件壓縮

對數據進行壓縮可以節省存儲空間和網路傳輸開銷,Spark原生的輸入方式(textFile和sequenFile)可以自動處理一些類型的壓縮。在讀取壓縮後的數據時,一些壓縮編解碼器可以推測壓縮類型。

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 轉載請註明原創出處,謝謝! 問題 這個Xmn設置為1G,,我用jmap heap 看,這個Eden From To怎麼不是一個整8:1:1的關係呢? 我看記憶體分配還是沒變,我Xmn1g,感覺From、To應該都是102.4M才對,現在是102.375M。 執行命令 結果: 發現很奇怪,的確和我們相信 ...
  • 轉載需要著名出處: 之前寫過微信登錄分享支付第一版: 前言 大部分的app都有接入第三方sdk的需求。例如第三方登錄需要接入微信、QQ、微博。第三方支付需要接入微信、支付寶、銀聯。 這些我都有使用過,都有使用過他們的sdk,感覺最麻煩的就是微信,不能直接調試,得用正式的簽名進行簽名才能調試。還有他們 ...
  • 數據泵技術是Oracle Database 10g 中的新技術,它比原來導入/導出(imp,exp)技術快15-45倍。速度的提高源於使用了並行技術來讀寫導出轉儲文件。 ...
  • 最近在學習Oracle的統計信息這一塊,收集統計信息的方法如下: DBMS_STATS.GATHER_TABLE_STATS ( ownname VARCHAR2, 所有者名字 tabname VARCHAR2, 表名 partname VARCHAR2 DEFAULT NULL, 要分析的分區名 ...
  • 將所學知識整理一下,備忘。 1. Hdfs (v 2.7.3) 1.1.啟動集群 註:這個啟動腳本是通過ssh對多個節點的namenode、datanode、journalnode以及zkfc進程進行批量啟動的。 1.2.啟動NameNode 1.3.啟動DataNode 1.4.停止集群 1.5. ...
  • 一.Oracle的支持數據類型 1.字元串類型 char 固定長度(定義時即已確定長度,空餘位置被補全),最大長度255,如 name char(10),'中'會占用10個長度; varchar2 長度不固定,根據實際情況占用,空餘被放棄,最大長度3999,如 name varchar2(10),'... ...
  • SQL概念:結構化查詢語言(SQL = Structured Query Language),也是一種編程語言(資料庫查詢和程式設計語言),可以用於數據的存取及查詢,更新,管理關係型資料庫系統ps: 不同資料庫系統之間的SQL不能完全相互通用;分類針對操作的對象不同,可以分成不同語言1: 數據操作( ...
  • 原創,轉發請註明出處。 MapReduce是hadoop這隻大象的核心,Hadoop 中,數據處理核心就是 MapReduce 程式設計模型。一個Map/Reduce 作業(job) 通常會把輸入的數據集切分為若幹獨立的數據塊,由 map任務(task)以完全並行的方式處理它們。框架會對map的輸出 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...