Spark快速入門 - Spark 1.6.0

来源:http://www.cnblogs.com/BYRans/archive/2016/02/19/5199824.html
-Advertisement-
Play Games

Spark快速入門 Spark 1.6.0 轉載請註明出處: "http://www.cnblogs.com/BYRans/" 快速入門(Quick Start) 本文簡單介紹了Spark的使用方式。首先介紹Spark的交互界面的API使用,然後介紹如何使用Java、Scala以及Python編寫S


Spark快速入門 - Spark 1.6.0


轉載請註明出處:http://www.cnblogs.com/BYRans/

快速入門(Quick Start)

本文簡單介紹了Spark的使用方式。首先介紹Spark的交互界面的API使用,然後介紹如何使用Java、Scala以及Python編寫Spark應用。詳細的介紹請閱讀Spark Programming Guide

在按照本文進行操作之前,請確保已安裝Spark。本文中的所有操作沒有使用HDFS,所以您可以安裝任何版本的Hadoop。

Spark互動式Shell的使用(Interactive Analysis with the Spark Shell)

基礎(Basics)

Spark的互動式Shell提供了一個簡單的方式來學習Spark的API,同時也提供了強大的互動式數據處理能力。Spark Shell支持Scala和Python兩種語言。啟動支持Scala的Spark Shell方式為

./bin/spark-shell

Spark最重要的一個抽象概念是彈性分散式數據集(Resilient Distributed Dataset)簡稱RDD。RDDs可以通過Hadoop InputFormats(例如HDFS文件)創建,也可以由其它RDDs轉換而來。下麵的例子是通過載入Spark目錄下的README.md文件生成RDD的例子:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs有兩種操作:

  • actions:返回計算值
  • transformations:返回一個新RDDs的引用

actions示例如下:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

如下transformations示例,使用filter操作返回了一個新的RDD,該RDD為文件中數據項的子集,該子集符合過濾條件:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

Spark也支持將actions和transformations一起使用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多RDD操作(More on RDD Operations)

RDD的actions和transformations操作可以用於更加複雜的計算。下麵是查找README.md文件中單詞數最多的行的單詞數目:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

上面代碼中,第一個map操作將一行文本按空格分隔,並計算單詞數目,將line映射為一個integer值,並創建了一個新的RDD保存這些integer值。RDD調用reduce計算最大的單詞數。示例中map和reduce操作的參數是Scala的函數式編程風格,Spark支持Scala、Java、Python的編程風格,並支持Scala/Java庫。例如,使用Scala中的Math.max()函數讓程式變得更加簡潔易讀:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

隨著Hadoop的流行,MapReduce變為一種常見的數據流模式。Spark可以輕鬆的實現MapReduce,使用Spark編寫MapReduce程式更加簡單:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

上面示例中,使用flatMap、map和reduceByKey操作來計算每個單詞在文件中出現的次數,並生成一個結構為的RDD。可以使用collect操作完成單詞統計結果的收集整合:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存(Caching)

Spark支持將數據緩存到集群的分散式記憶體中。在數據會被重覆訪問的情況下,將數據緩存到記憶體能減少數據訪問時間,從而提高運行效率。尤其是在數據分佈在幾十或幾百個節點上時,效果更加明顯。下麵為將數據linesWithSpark緩存到記憶體的示例:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

獨立應用(Self-Contained Applications)

假設我們想使用Spark API編寫獨立應用程式。我們可以使用Scala、Java和Python輕鬆的編寫Spark應用。下麵示例為一個簡單的應用示例:

  • Scala
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

上面程式分別統計了README中包含字元‘a’以及‘b’的行數。與前面Spark shell例子不同的是,我們需要初始化SparkContext。
我們通過SparkContext創建了一個SparkConf對象,SparkConf對象包含應用的基本信息。
我們基於Spark API編寫應用,所以我們需要編寫一個名為“simple.sbt”的sbt配置文件,用於指明Spark為該應用的一個依賴。下麵的sbt配置文件示例中,還增加了Spark的一個依賴庫“spark-core”:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

為了讓sbt正確執行,我們需要對SimpleApp.scala和simple.sbt根據sbt要求的目錄結構佈局。如果佈局正確,就可以生成該應用的JAR包,使用spark-submit命令即可運行該程式。

  • Java
/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

該示例的代碼邏輯同上一段Scala示例代碼。與Scala示例類似,首先初始化了SparkContext,通過SparkContext創建了JavaSparkContext對象。並創建了RDDs以及執行transformations操作。最後,通過繼承了spark.api.java.function.Function的類將函數傳給Spark。

在這裡,使用Maven進行編譯,Maven的pom.xml如下:

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>
  </dependencies>
</project>

按照Maven的要求架構配置文件位置:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

現在,就可以使用Maven打包應用,以及使用命令./bin/spark-submit.執行該應用程式。示例如下:

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23



您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 每一個應用程式中都有一個NSNotificationCenter實例,用來協助不同的對象之間的通信,任何一個對象都可以向通知中心發佈通知(NSNotication),在通知中描述自己做什麼。其他的感興趣的對象可以申請在某個特定的通知或者特定對象發出通知時接收到這個通知。 一個通知一般包含有3個屬性:
  • 背景:假說有兩個Activity, Activity1和Activity2, 1跳轉到2,如果要在2退出程式,一般網上比較常見的說法是用 System.exit(0) 或是 android.os.Process.killProcess(android.os.Process.myPid()) 但實際應
  • MySQL 提供了資料庫的同步功能,這對我們實現資料庫的冗災、備份、恢復、負載均衡等都是有極大幫助的。本文描述了常見的同步設置方法。 一、準備伺服器 由於MySQL不同版本之間的(二進位日誌)binlog格式可能會不一樣,因此最好的搭配組合是Master的MySQL版本和Slave的版本相同或者更低
  • 最近要優化Oracle資料庫的效率,然後在網上查了很多判斷記錄是否存在的高效率方法網上有很多的建議第一種方法,我做了一個測試,但是可能數據量不夠大,42667條記錄,不知道很大的數據量是什麼一個情況網上好多高效的建議方式 select * from item where item='1B241371
  • 隨著生產數據的日誌越來越大,硬碟空間越來越小的時候,我們就需要考慮清理一下資料庫日誌,以前都是手工弄,現在找到一個語句直接自動處理,方便很多,分享一下。 DUMP TRANSACTION CMSDemo WITH NO_LOG BACKUP LOG CMSDemo WITH NO_LOG DBCC
  • SQL Server代理是所有實時資料庫的核心。代理有很多不明顯的用法,因此系統的知識,對於開發人員還是DBA都是有用的。這系列文章會通俗介紹它的很多用法。 在這個系列的前一篇文章里,你學習瞭如何在SQL Server代理作業步驟里啟動外部程式。你可以使用過時的ActiveX系統,從虛擬命令提示符里
  • 概述 變數在存儲過程中會經常被使用,變數的使用方法是一個重要的知識點,特別是在定義條件這塊比較重要。 mysql版本:5.6 變數定義和賦值 #創建資料庫 DROP DATABASE IF EXISTS Dpro; CREATE DATABASE Dpro CHARACTER SET utf8 ;
  • MySQL伺服器通過許可權表來控制用戶對資料庫的訪問,許可權表存放在mysql資料庫里,由mysql_install_db腳本初始化。這些許可權表分別user,db,table_priv,columns_priv和host。下麵分別介紹一下這些表的結構和內容: user許可權表:記錄允許連接到伺服器的用戶帳
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...