Spark快速入門 Spark 1.6.0 轉載請註明出處: "http://www.cnblogs.com/BYRans/" 快速入門(Quick Start) 本文簡單介紹了Spark的使用方式。首先介紹Spark的交互界面的API使用,然後介紹如何使用Java、Scala以及Python編寫S
Spark快速入門 - Spark 1.6.0
轉載請註明出處:http://www.cnblogs.com/BYRans/
快速入門(Quick Start)
本文簡單介紹了Spark的使用方式。首先介紹Spark的交互界面的API使用,然後介紹如何使用Java、Scala以及Python編寫Spark應用。詳細的介紹請閱讀Spark Programming Guide。
在按照本文進行操作之前,請確保已安裝Spark。本文中的所有操作沒有使用HDFS,所以您可以安裝任何版本的Hadoop。
Spark互動式Shell的使用(Interactive Analysis with the Spark Shell)
基礎(Basics)
Spark的互動式Shell提供了一個簡單的方式來學習Spark的API,同時也提供了強大的互動式數據處理能力。Spark Shell支持Scala和Python兩種語言。啟動支持Scala的Spark Shell方式為
./bin/spark-shell
Spark最重要的一個抽象概念是彈性分散式數據集(Resilient Distributed Dataset)簡稱RDD。RDDs可以通過Hadoop InputFormats(例如HDFS文件)創建,也可以由其它RDDs轉換而來。下麵的例子是通過載入Spark目錄下的README.md文件生成RDD的例子:
scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
RDDs有兩種操作:
- actions:返回計算值
- transformations:返回一個新RDDs的引用
actions示例如下:
scala> textFile.count() // Number of items in this RDD
res0: Long = 126
scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark
如下transformations示例,使用filter操作返回了一個新的RDD,該RDD為文件中數據項的子集,該子集符合過濾條件:
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
Spark也支持將actions和transformations一起使用:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多RDD操作(More on RDD Operations)
RDD的actions和transformations操作可以用於更加複雜的計算。下麵是查找README.md文件中單詞數最多的行的單詞數目:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
上面代碼中,第一個map操作將一行文本按空格分隔,並計算單詞數目,將line映射為一個integer值,並創建了一個新的RDD保存這些integer值。RDD調用reduce計算最大的單詞數。示例中map和reduce操作的參數是Scala的函數式編程風格,Spark支持Scala、Java、Python的編程風格,並支持Scala/Java庫。例如,使用Scala中的Math.max()函數讓程式變得更加簡潔易讀:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
隨著Hadoop的流行,MapReduce變為一種常見的數據流模式。Spark可以輕鬆的實現MapReduce,使用Spark編寫MapReduce程式更加簡單:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
上面示例中,使用flatMap、map和reduceByKey操作來計算每個單詞在文件中出現的次數,並生成一個結構為
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
緩存(Caching)
Spark支持將數據緩存到集群的分散式記憶體中。在數據會被重覆訪問的情況下,將數據緩存到記憶體能減少數據訪問時間,從而提高運行效率。尤其是在數據分佈在幾十或幾百個節點上時,效果更加明顯。下麵為將數據linesWithSpark緩存到記憶體的示例:
scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 19
scala> linesWithSpark.count()
res9: Long = 19
獨立應用(Self-Contained Applications)
假設我們想使用Spark API編寫獨立應用程式。我們可以使用Scala、Java和Python輕鬆的編寫Spark應用。下麵示例為一個簡單的應用示例:
- Scala
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
上面程式分別統計了README中包含字元‘a’以及‘b’的行數。與前面Spark shell例子不同的是,我們需要初始化SparkContext。
我們通過SparkContext創建了一個SparkConf對象,SparkConf對象包含應用的基本信息。
我們基於Spark API編寫應用,所以我們需要編寫一個名為“simple.sbt”的sbt配置文件,用於指明Spark為該應用的一個依賴。下麵的sbt配置文件示例中,還增加了Spark的一個依賴庫“spark-core”:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
為了讓sbt正確執行,我們需要對SimpleApp.scala和simple.sbt根據sbt要求的目錄結構佈局。如果佈局正確,就可以生成該應用的JAR包,使用spark-submit命令即可運行該程式。
- Java
/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
}
}
該示例的代碼邏輯同上一段Scala示例代碼。與Scala示例類似,首先初始化了SparkContext,通過SparkContext創建了JavaSparkContext對象。並創建了RDDs以及執行transformations操作。最後,通過繼承了spark.api.java.function.Function
的類將函數傳給Spark。
在這裡,使用Maven進行編譯,Maven的pom.xml如下:
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
按照Maven的要求架構配置文件位置:
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
現在,就可以使用Maven打包應用,以及使用命令./bin/spark-submit.
執行該應用程式。示例如下:
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23