Spark編程基礎

来源:https://www.cnblogs.com/paopaoT/archive/2023/06/28/17512541.html
-Advertisement-
Play Games

# Scala編寫Spark的WorkCount ## 創建一個Maven項目 在pom.xml中添加依賴和插件 ```XML 8 8 UTF-8 3.2.3 2.12.15 org.scala-lang scala-library ${scala.version} org.apache.spark ...


Scala編寫Spark的WorkCount

創建一個Maven項目

在pom.xml中添加依賴和插件

<!-- 定義的一些常量 -->
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <spark.version>3.2.3</spark.version>
    <scala.version>2.12.15</scala.version>
</properties>

<dependencies>
    <!-- scala的依賴 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <!-- spark core 即為spark內核 ,其他高級組件都要依賴spark core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>

</dependencies>

<!-- 配置Maven的鏡像庫 -->
<!-- 依賴下載國內鏡像庫 -->
<repositories>
    <repository>
        <id>nexus-aliyun</id>
        <name>Nexus aliyun</name>
        <layout>default</layout>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </releases>
    </repository>
</repositories>

<!-- maven插件下載國內鏡像庫 -->
<pluginRepositories>
    <pluginRepository>
        <id>ali-plugin</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>never</updatePolicy>
        </releases>
    </pluginRepository>
</pluginRepositories>

<build>
    <pluginManagement>
        <plugins>
            <!-- 編譯scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
            </plugin>
            <!-- 編譯java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
        </plugins>
    </pluginManagement>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <!-- 打jar插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

創建一個scala目錄

選擇scala目錄,右鍵,將目錄轉成源碼包,或者點擊maven的刷新按鈕
image

編寫Spark程式

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 1.創建SparkContext
  * 2.創建RDD
  * 3.調用RDD的Transformation(s)方法
  * 4.調用Action
  * 5.釋放資源
  */
object WordCount {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("WordCount")
    //創建SparkContext,使用SparkContext來創建RDD
    val sc: SparkContext = new SparkContext(conf)
    //spark寫Spark程式,就是對抽象的神奇的大集合【RDD】編程,調用它高度封裝的API
    //使用SparkContext創建RDD
    val lines: RDD[String] = sc.textFile(args(0))

    //Transformation 開始 //
    //切分壓平
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //將單詞和一組合放在元組中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    //分組聚合,reduceByKey可以先局部聚合再全局聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
    //Transformation 結束 //

    //調用Action將計算結果保存到HDFS中
    sorted.saveAsTextFile(args(1))
    //釋放資源
    sc.stop()
  }
}

使用maven打包

image

提交任務

• 上傳jar包到伺服器,然後使用sparksubmit命令提交任務

/bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \
--master spark://node-1.51doit.cn:7077 \
--executor-memory 1g --total-executor-cores 4 \
--class cn._51doit.spark.day01.WordCount \
/root/spark-in-action-1.0.jar hdfs://node-1.51doit.cn:9000/words.txt hdfs://node-1.51doit.cn:9000/out
 
參數說明:
--master 指定masterd地址和埠,協議為spark://,埠是RPC的通信埠
--executor-memory 指定每一個executor的使用的記憶體大小
--total-executor-cores指定整個application總共使用了cores
--class 指定程式的main方法全類名
jar包路徑 args0 args1
 

Java編寫Spark的WordCount

使用匿名實現類方式

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
        //創建JavaSparkContext
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        //使用JavaSparkContext創建RDD
        JavaRDD<String> lines = jsc.textFile(args[0]);
        //調用Transformation(s)
        //切分壓平
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        //將單詞和一組合在一起
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return Tuple2.apply(word, 1);
                    }
        });
        //分組聚合
        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //排序,先調換KV的順序VK
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(
                new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                return tp.swap();
            }
        });
        //再排序
        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
        //再調換順序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(
                new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });
        //觸發Action,將數據保存到HDFS
        result.saveAsTextFile(args[1]);
        //釋放資源
        jsc.stop();
    }
}

使用Lambda表達式方式

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLambdaWordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount");
        //創建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //創建RDD
        JavaRDD<String> lines = jsc.textFile(args[0]);
        //切分壓平
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        //將單詞和一組合
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1));
        //分組聚合
        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);
        //調換順序
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(tp -> tp.swap());
        //排序
        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
        //調換順序
        JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap());
        //將數據保存到HDFS
        result.saveAsTextFile(args[1]);
        //釋放資源
        jsc.stop();
    }
}

本地運行Spark和Debug

spark程式每次都打包上在提交到集群上比較麻煩且不方便調試,Spark還可以進行Local模式運行,方便測試和調試

在本地運行

 //Spark程式local模型運行,local[*]是本地運行,並開啟多個線程
val conf: SparkConf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //設置為local模式執行

並輸入運行參數
hdfs://linux01:9000/words.txt hdfs://linux01:9000/out/out01

讀取HDFS中的數據

由於往HDFS中的寫入數據存在許可權問題,所以在代碼中設置用戶為HDFS目錄的所屬用戶

//往HDFS中寫入數據,將程式的所屬用戶設置成更HDFS一樣的用戶
System.setProperty("HADOOP_USER_NAME", "root")

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • .NET Core 6引入了創建單文件可執行文件的功能。這隻允許分發一個應用程式文件,因為所有配置和依賴項都包含在二進位文件本身中。 該功能為依賴項嵌入提供了一種本機方法,這在發佈生成數百個程式集的獨立應用程式時最有益。它可用於依賴於框架或自包含的應用程式,但在這兩種情況下都需要設置運行時標識符以針 ...
  • ## 一:背景 ### 1. 講故事 前段時間有個朋友找到我,說他們的程式有偶發崩潰的情況,讓我幫忙看下怎麼回事,針對這種 crash 的程式,用 AEDebug 的方式抓取一個便知,有了 dump 之後接下來就可以分析了。 ## 二:Windbg 分析 ### 1. 為什麼會崩潰 既然是程式的崩潰 ...
  • 很多人看到這個Eazfuscator.NET還不知是什麼東東。。。 首先介紹下 什麼是 Eazfuscator.NET? Eazfuscator.NET 是用於.NET平臺的工業級混淆器。 Eazfuscator.NET 提供的混淆保護了軟體中根深蒂固的知識產權,提高了商業盈利能力,並保持了競爭優勢 ...
  • 繼上篇:Taurus .Net Core 微服務開源框架:Admin 插件【1】 - 微服務節點管理,本篇繼續介紹下一個內容:系統環境信息管理...... ...
  • # 痞子衡嵌入式半月刊: 第 78 期 ![](http://henjay724.com/image/cnblogs/pzh_mcu_bi_weekly.PNG) 這裡分享嵌入式領域有用有趣的項目/工具以及一些熱點新聞,農曆年分二十四節氣,希望在每個交節之日準時發佈一期。 本期刊是開源項目(GitH ...
  • ## 前言 > 1. 檢查防火牆是否關閉 > > vim /etc/selinux/config > > SELINUX=disabled > > 2. 記憶體4G為好 > > 3. 配置好阿裡yum源 ## 實驗步驟-服務端 > 1. 獲取zabbix的下載源 > > rpm -Uvh https: ...
  • 在 Linux 系統下開發軟體,輸出的可執行文件可大可小,運行環境如果是在伺服器那麼可能資源比較充足,但如果是在嵌入式環境,那麼存儲資源是寸土必爭的。所以會有對可執行文件進行瘦身的需求,比如使用指令 strip。 ...
  • 一、對資料庫及表的基礎操作 1、連接資料庫伺服器 mysql -hlocalhost -uroot -p123456 2、2.退出伺服器 exit 3、查看所有的資料庫 show databases; 4、創建一個資料庫 create database java; 5、刪除資料庫 drop data ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...