Spark操作:Aggregate和AggregateByKey

来源:http://www.cnblogs.com/mstk/archive/2017/06/13/7000509.html
-Advertisement-
Play Games

1. Aggregate Aggregate即聚合操作。直接上代碼: acc即(0,0),number即data,seqOp將data的值累加到Tuple的第一個元素,將data的個數累加到Tuple的第二個元素。由於沒有分區,所以combOp是不起作用的,這個例子裡面即使分區了,combOp起作用 ...


1. Aggregate

Aggregate即聚合操作。直接上代碼:

import org.apache.spark.{SparkConf, SparkContext}

object AggregateTest {

  def main(args:Array[String]) = {

    // 設置運行環境
    val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)

    var data = List(2,5,8,1,2,6,9,4,3,5)
    var res = data.par.aggregate((0,0))(
      // seqOp
      (acc, number) => (acc._1+number, acc._2+1),
      // combOp
      (par1, par2) => (par1._1+par2._1, par1._2+par2._2)
    )

    println(res)

    sc.stop
  }

}

acc即(0,0),number即data,seqOp將data的值累加到Tuple的第一個元素,將data的個數累加到Tuple的第二個元素。由於沒有分區,所以combOp是不起作用的,這個例子裡面即使分區了,combOp起作用了,結果也是一樣的。

運行結果:

(45,10)

2. AggregateByKey

AggregateByKey和Aggregate差不多,也是聚合,不過它是根據Key的值來聚合。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/6/13.
  */
object AggregateByKeyTest {

  def main(args:Array[String]) = {

    // 設置運行環境
    val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)

    val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
    val rdd = sc.parallelize(data)

    val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)(
      // seqOp
      math.max(_,_),
      // combOp
      _+_
    )

    res.collect.foreach(println)
    sc.stop
  }

}

根據Key值的不同,可以分為3個組:

(1)  (1,3),(1,2),(1,4);

(2)  (2,3);

(3)  (3,6),(3,8)。

這3個組分別進行seqOp,也就是(K,V)裡面的V和0進行math.max()運算,運算結果和下一個V繼續運算,以第一個組為例,運算過程是這樣的:

0, 3 => 3

3, 2 => 3

3, 4 => 4

所以最終結果是(1,4)。combOp是對把各分區的V加起來,由於這裡並沒有分區,所以實際上是不起作用的。

運行結果:

(2,3)
(1,4)
(3,8)

如果生成RDD時分成3個區:

val rdd = sc.parallelize(data,3)

運行結果就變成了:

(3,8)
(1,7)
(2,3)

這是因為一個分區返回(1,3),另一個分區返回(1,4),combOp將這兩個V加起來,就得到了(1,7)。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 線性回歸演算法,是利用數理統計中回歸分析,來確定兩種或兩種以上變數間相互依賴的定量關係的一種統計分析方法。 1. 梯度下降法 線性回歸可以使用最小二乘法,但是速度比較慢,因此一般使用梯度下降法(Gradient Descent),梯度下降法又分為批量梯度下降法(Batch Gradient Desce ...
  • 目錄 一、pymysql 二、SQLAlchemy 一、pymysql pymsql是Python中操作MySQL的模塊,其使用方法和MySQLdb幾乎相同。 1. 下載安裝 2. 使用操作 a. 執行SQL b. 獲取新創建數據自增ID c. 獲取查詢數據 註:在fetch數據時按照順序進行,可以 ...
  • Elasticsearch快速入門 第1篇:Elasticsearch入門 Elasticsearch快速入門 第2篇:Elasticsearch和Kibana安裝 Elasticsearch快速入門 第3篇:Elasticsearch索引和文檔操作 Elasticsearch快速入門 第4篇:El ...
  • 一直習慣使用sys.master_files來統計資料庫的大小以及使用情況,但是發現sys.master_files不能準確統計tempdb的資料庫大小信息。如下所示: SELECT database_id AS DataBaseId ,DB_NAME(database_id) ... ...
  • 本文主要參考 http://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/RackAwareness.html hadoop組件是機棧敏感(譯註rack,機棧,可以簡單理解為節點的擺放)。 例如,HDFS塊的分佈會利用 ...
  • MSSQL 2016支持了utf8編碼的文件,之前處理比較麻煩的bcp 方式導入特殊字元一下子就方便了。 但是之前的版本,處理起來還是有一點麻煩。這次處理使用的資料庫版本是sql server 2014, 用於測試的例子用2個字元串 T3 Rénové tout Confort proche mét ...
  • Power BI 報表伺服器讓你的用戶能夠訪問數據、獲取見解,並能夠使用 SQL 報表伺服器服務的企業報告功能 - 這一切都在現代本地解決方案中完成。讓用戶能夠直觀瀏覽數據並快速發現模式,以便更快作出更好的決策。同時生成滿足你業務需求的精確格式的報表。你還可以胸有成竹地擴展到數千名用戶,因為 Pow ...
  • SQL%NOTFOUND 是一個布爾值。與最近的sql語句(update,insert,delete,select)發生交互,當最近的一條sql語句沒有涉及任何行的時候,則返回true。否則返回false。這樣的語句在實際應用中,是非常有用的。例如要update一行數據時,如果沒有找到,就可以作相應 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...