最近寫了幾個簡單的spark structured streaming 的代碼實例。 目的是熟悉spark 開發環境搭建, spark 代碼開發流程。 開發環境: 系統:win 11 java : 1.8 scala:2.13 工具:idea 2022.2 ,maven 3, git 2.37 sp ...
最近寫了幾個簡單的spark structured streaming 的代碼實例。 目的是熟悉spark 開發環境搭建, spark 代碼開發流程。
開發環境:
系統:win 11
java : 1.8
scala:2.13
工具:idea 2022.2 ,maven 3, git 2.37
spark : 3.3.2
一, 使用 spark 結構化流讀取文件數據,並做單詞統計。
代碼:
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;
public class Main {
/*
例子:從文件中讀取流, 被定義模式,生成dataset ,使用sql api 進行分析。
*/
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
System.out.println("Hello world!");
SparkSession spark = SparkSession.builder().appName("spark streaming").config("spark.master", "local")
.config("spark.sql.warehouse.dir", "file:///app/")
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");
StructType schema =
new StructType().add("empId", DataTypes.StringType).add("empName", DataTypes.StringType)
.add("department", DataTypes.StringType);
Dataset<Row> rawData = spark.readStream().option("header", false).format("csv").schema(schema)
.csv("D:/za/spark_data/*.csv");
rawData.createOrReplaceTempView("empData");
Dataset<Row> result = spark.sql("select count(*), department from empData group by department");
StreamingQuery query = result.writeStream().outputMode("complete").format("console").start(); // 每次觸發,全表輸出
query.awaitTermination();
}
}
輸出:
二, 使用 spark 結構化流讀取socket流,做單詞統計,使用Java編程
代碼:
package org.example;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class SocketStreaming_wordcount {
/*
* 從socket 讀取字元流,並做word count分析
*
* */
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate();
// dataframe 表示 socket 字元流
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// 把一行字元串切分為 單詞
Dataset<String> words = lines
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// 對單詞分組計數
Dataset<Row> wordCounts = words.groupBy("value").count();
// 開始查詢並列印輸出到console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
}
輸出:
二, 使用 spark 結構化流讀取socket流,做單詞統計,使用scala 編程
代碼:
package org.example
import org.apache.spark.sql.SparkSession
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("streaming_socket_scala")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
// 創建datafram 象徵從網路socket 接收流
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 切分一行成單詞
val words = lines.as[String].flatMap(_.split(" "))
// 進行單詞統計
val wordCounts = words.groupBy("value").count()
// 開始查詢並輸出
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
輸出:
功能比較簡單,代碼比較簡單,可以在網路上找到很多。 但是也是一個完整的spark結構流代碼開發流程。權當熟悉下開發流程。
---一------步-----一 ------個-----腳--------印----------