Spark快速上手(3)Spark核心編程-RDD轉換運算元

来源:https://www.cnblogs.com/unknownshangke/archive/2022/07/01/16434018.html
-Advertisement-
Play Games

#RDD(2) ##RDD轉換運算元 RDD根據數據處理方式的不同將運算元整體上分為Value類型、雙Value類型、Key-Value類型 ###value類型 ####map 函數簽名 def map[U:ClassTag](f:T=>U):RDD[U] 函數說明 將處理的數據逐條進行映射轉換,這裡 ...


RDD(2)

RDD轉換運算元

RDD根據數據處理方式的不同將運算元整體上分為Value類型、雙Value類型、Key-Value類型

value類型

map

函數簽名
def map[U:ClassTag](f:T=>U):RDD[U]
函數說明
將處理的數據逐條進行映射轉換,這裡的轉換可以是類型的轉換,也可以是值的轉換
e.g.1

 val source = sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6))
    val map = source.map(item => item*10)
    val result = map.collect()
    result.foreach(println)

e.g.2

   val data1: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4), 2)
//    val data2: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4), 1)
    val rdd1: RDD[Int] = data1.map(
      num => {
        println(">>>" + num)
        num
      }
    )
    val rdd2: RDD[Int] = rdd1.map(
      num => {
        println("<<<" + num)
        num
      }
    )
    rdd2.collect()

note:
RDD計算同一分區內數據有序,不同分區數據無序

(func)從伺服器日誌數據apache.log中獲取用戶請求URL資源路徑(例):
apache.log

83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png

code:

val data = sparkContext.textFile("input/apache.log")
    val clean = data.map{
      item => {
        item.split(" ")(6)
      }
    }
    clean.foreach(println(_))

mapPartitions

函數簽名

def mapPartitions[U:ClassTag](
  f:Iterator[T] =>Iterator[U],
  preservesPartitioning:Boolean = false):RDD[U]

函數說明
將待處理的數據以分區為單位發送到計算節點進行任意的處理(過濾數據亦可)
note: 函數會將整個分區的數據載入到記憶體中進行引用。記憶體較小、數據量較大的情況下,容易出現記憶體溢出。

val dataRDD1: RDD[Int]= dataRDD.mapPartitions(
  datas =>{            //遍歷每個分區進行操作
    datas.filter(_==2) //過濾每個分區中值為2的數據
  }
)

(func)獲取每個數據分區的最大值

code:

object getMaxFromArea {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Max")
    val sparkContext: SparkContext = new SparkContext(sparkConf)
    val source: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val mapPartition: RDD[Int] = source.mapPartitions(p => List(p.max).iterator)

    //多個分區獲取最大值,使用迭代器
    val result: Array[Int] = mapPartition.collect()
    result.foreach(println)
    sparkContext.stop()

  }
}

comparison:

map和mapPartitions的區別

數據處理角度
Map 運算元是分區內一個數據一個數據的執行,類似於串列操作。而 mapPartitions 運算元
是以分區為單位進行批處理操作。

功能的角度
Map 運算元主要目的將數據源中的數據進行轉換和改變。但是不會減少或增多數據。
MapPartitions 運算元需要傳遞一個迭代器,返回一個迭代器,沒有要求的元素的個數保持不變,
所以可以增加或減少數據

性能的角度
Map 運算元因為類似於串列操作,所以性能比較低,而是 mapPartitions 運算元類似於批處
理,所以性能較高。但是 mapPartitions 運算元會長時間占用記憶體,那麼這樣會導致記憶體可能
不夠用,出現記憶體溢出的錯誤。所以在記憶體有限的情況下,不推薦使用。使用 map 操作

mapPartitionsWithIndex

函數簽名
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
函數說明
將待處理的數據以分區為單位發送到計算節點進行處理,這裡的處理是指可以進行任意的處理,哪怕是過濾數據,在處理時同時可以獲取當前分區索引

val dataRDD1 = dataRDD.mapPartitionsWithIndex(
 (index, datas) => {
      datas.map(index, _)
 }
)

(func)獲取第二個數據分區的數據

code:

object getSecondArea {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Sec")
    val sparkContext: SparkContext = new SparkContext(sparkConf)
    val source: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val mapPartitionsWithIndex: RDD[Int] = source.mapPartitionsWithIndex(
      (index, data) => {
        if (index == 1) {
          data
        } else {
          Nil.iterator
        }
      }
    ) 
    val result: Array[Int] = mapPartitionsWithIndex.collect()
    result.foreach(println(_))
    sparkContext.stop()
  }

}

(func)獲取每個數據及其對應分區索引

code:

object getDataAndIndexOfArea {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Data")
    val sparkContext: SparkContext = new SparkContext(conf)

    val data: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4, 5, 6))
    val dataAndIndex: RDD[(Int, Int)] = data.mapPartitionsWithIndex(
      (index, iter) => {
        iter.map(data => (data, index))
      }
    )

    val result: Array[(Int, Int)] = dataAndIndex.collect()
    result.foreach(println)

  }

}


note:這裡沒有自定義分區數量,故預設最多分區數(與機器邏輯處理器數量相關),數據隨機存儲在這些分區中

flatMap

函數簽名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
函數說明
將處理的數據進行扁平化後再進行映射處理,所以運算元也稱作扁平映射

val dataRDD = sparkContext.makeRDD(
	List(List(1,2),List(3,4)),1)
val dataRDD1 = dataRDD.flatMap(list => list)
1
2
3
4

(func)將List("Hello World","Hello Spark")進行扁平化操作
1)字元扁平化

val data = sparkContext.makeRDD(List("Hello World","Hello Spark"),1)
    val rdd: RDD[Char] = data.flatMap(list => list)
    val result1: Array[Char] = rdd.collect()
    result1.foreach(item => print(item+" "))

2)字元串扁平化

val data1: RDD[String] = sparkContext.parallelize(List("Hello World", "Hello Spark"), 1)
    val rdd1: RDD[String] = data1.flatMap(list => {
      list.split(" ")
    })
    val result2: Array[String] = rdd1.collect()
    result2.foreach(item => println(item + " "))


(func)將List(List(1,2),3,List(4,5))進行扁平化操作

thinking:List(List(1,2),3,List(4,5)) => List(list,int,list) => RDD[Any]
當數據的格式不能夠滿足時我們可以使用match進行格式的匹配(類似java中的switch,case)

code:

val data2: RDD[Any] = sparkContext.parallelize(List(List(1, 2), 3, List(4, 5)))
    val rdd2: RDD[Any] = data2.flatMap {
   //  完整代碼:
      //          dat =>{
      //            dat match {
      //              case list: List[_] => list
      //              case int => List(int)
      //            }
      //          }
      case list: List[_] => list
      case int => List(int)
    }
    val result3: Array[Any] = rdd2.collect()
    result3.foreach(item => print(item + " "))


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 來源:cnblogs.com/youzhibing/p/15354706.html 疑慮背景 疑慮描述 最近,在進行開發的過程中,發現之前的一個寫法,類似如下: 以我的理解,@Configuration 加 @Bean 會創建一個 userName 不為 null 的 UserManager 對象, ...
  • 進入移動互聯網時代以來,Windows桌面開發已經很久不碰了。之前就是從做Windows開發入行的。 當年,還是C++ VC6, MFC的時代。那時候開發要查的是MSDN :-)。記憶體要自己管理, 排查記憶體泄漏(忘了釋放分配的記憶體)也是基本日常。光陰似箭,歲月如梭~! 幾年之前,北漂時需要寫一個wi ...
  • Sorted Set (ZSet) 數據結構 Sorted Set (ZSet), 即有序集合, 底層使用 壓縮列表(ziplist) 或者 跳躍表(skiplist) 使用 壓縮列表(ziplist) 當同時滿足下麵兩個條件時,使用 ziplist 存儲數據 元素個數少於128個 (zset-ma ...
  • 1 環境準備: VirtualBox下載地址:https://www.virtualbox.org/wiki/Downloads,根據自己的系統類型進行下載安裝即可。 openEuler ISO下載地址:https://www.openeuler.org/zh/download/,選擇自己想要的版本 ...
  • 一、概述 RAID ( Redundant Array of Independent Disks )即獨立磁碟冗餘陣列,通常簡稱為磁碟陣列。簡單地說, RAID 是由多個獨立的高性能磁碟驅動器組成的磁碟子系統,從而提供比單個磁碟更高的存儲性能和數據冗餘高可靠性的存儲技術。RAID分為硬 RAID、全 ...
  • Dreamweaver 2021 mac版是目前行業中最優秀的一款網站開發利器,新版本的dw 2021下載比以往任何版本都更專註、更高效和快速,具備全新代碼編輯器、更直觀的用戶界面和多種增強功能。強大的功能可以幫助編程人員更輕鬆、高效的設計網頁。 Dreamweaver 2021 for Mac(D ...
  • 鏡像下載、功能變數名稱解析、時間同步請點擊 阿裡雲開源鏡像站 Maven集成 在Jenkins上發佈Java項目時需要使用Maven來進行構建打包(Gradle項目則需要安裝配置Gradle) 1.1 環境準備 這篇文章是在前一篇文章的基礎上 maven包下載地址 [root@192 java]# pwd ...
  • 記錄如何通過 valgrind 的 memcheck 工具分析定位記憶體泄漏的問題 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...