雲小課|MRS數據分析-通過Spark Streaming作業消費Kafka數據

来源:https://www.cnblogs.com/huaweiyun/archive/2023/02/23/17148597.html
-Advertisement-
Play Games

閱識風雲是華為雲信息大咖,擅長將複雜信息多元化呈現,其出品的一張圖(雲圖說)、深入淺出的博文(雲小課)或短視頻(雲視廳)總有一款能讓您快速上手華為雲。更多精彩內容請單擊此處。 摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介 ...


閱識風雲是華為雲信息大咖,擅長將複雜信息多元化呈現,其出品的一張圖(雲圖說)、深入淺出的博文(雲小課)或短視頻(雲視廳)總有一款能讓您快速上手華為雲。更多精彩內容請單擊此處。

摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka數據。

本文分享自華為雲社區《【雲小課】EI第48課 MRS數據分析-通過Spark Streaming作業消費Kafka數據》,作者: 閱識風雲 。

Spark是分散式批處理框架,提供分析挖掘與迭代式記憶體計算能力,支持多種語言(Scala/Java/Python)的應用開發。

Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka數據。

在本案例中,假定某個業務Kafka每1秒就會收到1個單詞記錄。基於業務需要,開發的Spark應用程式實現實時累加計算每個單詞的記錄總數的功能。

本案例基本操作流程如下所示:

  1. 創建MRS集群。
  2. 準備應用程式。
  3. 上傳Jar包及源數據。
  4. 運行作業並查看結果。

場景描述

Spark提供分析挖掘與迭代式記憶體計算能力, 適用以下場景:

  • 數據處理(Data Processing):可以用來快速處理數據,兼具容錯性和可擴展性。
  • 迭代計算(Iterative Computation):支持迭代計算,有效應對多步的數據處理邏輯。
  • 數據挖掘(Data Mining):在海量數據基礎上進行複雜的挖掘分析,可支持各種數據挖掘和機器學習演算法。
  • 流式處理(Streaming Processing):支持秒級延遲的流式處理,可支持多種外部數據源。
  • 查詢分析(Query Analysis):支持標準SQL查詢分析,同時提供DSL(DataFrame), 並支持多種外部輸入。

當前Spark支持兩種數據處理方式:Direct Streaming和Receiver方式。

Direct Streaming方式主要通過採用Direct API對數據進行處理。以Kafka Direct介面為例,與啟動一個Receiver來連續不斷地從Kafka中接收數據並寫入到WAL中相比,Direct API簡單地給出每個batch區間需要讀取的偏移量位置。然後,每個batch的Job被運行,而對應偏移量的數據在Kafka中已準備好。這些偏移量信息也被可靠地存儲在checkpoint文件中,應用失敗重啟時可以直接讀取偏移量信息。

Direct Kafka介面數據傳輸

需要註意的是,Spark Streaming可以在失敗後重新從Kafka中讀取並處理數據段。然而,由於語義僅被處理一次,重新處理的結果和沒有失敗處理的結果是一致的。

因此,Direct API消除了需要使用WAL和Receivers的情況,且確保每個Kafka記錄僅被接收一次,這種接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。總體來說,這些特性使得流處理管道擁有高容錯性、高效性及易用性,因此推薦使用Direct Streaming方式處理數據。

在一個Spark Streaming應用開始時(也就是Driver開始時),相關的StreamingContext(所有流功能的基礎)使用SparkContext啟動Receiver成為長駐運行任務。這些Receiver接收並保存流數據到Spark記憶體中以供處理。用戶傳送數據的生命周期如圖1-2所示:

數據傳輸生命周期

  1. 接收數據(藍色箭頭)
    Receiver將數據流分成一系列小塊,存儲到Executor記憶體中。另外,在啟用預寫日誌(Write-ahead Log,簡稱WAL)以後,數據同時還寫入到容錯文件系統的預寫日誌中。
  2. 通知Driver(綠色箭頭)
    接收塊中的元數據(Metadata)被髮送到Driver的StreamingContext。這個元數據包括:
    定位其在Executor記憶體中數據位置的塊Reference ID。
    若啟用了WAL,還包括塊數據在日誌中的偏移信息。
  3. 處理數據(紅色箭頭)
    對每個批次的數據,StreamingContext使用Block信息產生RDD及其Job。StreamingContext通過運行任務處理Executor記憶體中的Block來執行Job。
  4. 周期性地設置檢查點(橙色箭頭)
  5. 為了容錯的需要,StreamingContext會周期性地設置檢查點,並保存到外部文件系統中。

華為雲MapReduce服務提供了Spark服務多種場景下的樣例工程,本案例對應示例場景的開發思路:

  1. 接收Kafka中數據,生成相應DStream。
  2. 對單詞記錄進行分類統計。
  3. 計算結果,併進行列印。

步驟1:創建MRS集群

1、創建併購買一個包含有Spark2x、Kafka組件的MRS集群,詳情請參見MRS用戶指南的“購買自定義集群”。

說明:本文以購買的MRS 3.1.0版本的集群為例,集群未開啟Kerberos認證。

2、集群購買成功後,在MRS集群的任一節點內,安裝集群客戶端,具體操作可參考MRS快速入門的“安裝並使用集群客戶端”。

例如客戶端安裝目錄為“/opt/client”。

步驟2:準備應用程式

1、通過開源鏡像站獲取樣例工程。

下載樣例工程的Maven工程源碼和配置文件,併在本地配置好相關開發工具,可參考MRS開髮指南(普通版_3.x)的“通過開源鏡像站獲取樣例工程”。

根據集群版本選擇對應的分支,下載並獲取MRS相關樣例工程。

例如本章節場景對應示例為“SparkStreamingKafka010JavaExample”樣例。

2、本地使用IDEA工具導入樣例工程,等待Maven工程下載相關依賴包,具體操作可參考考MRS開髮指南(普通版_3.x)的Spark開髮指南(普通模式)的“配置並導入樣例工程”。

在本示例工程中,通過使用Streaming調用Kafka介面來獲取單詞記錄,然後把單詞記錄分類統計,得到每個單詞記錄數,關鍵代碼片段如下:

public class StreamingExampleProducer { 
    public static void main(String[] args) throws IOException { 
        if (args.length < 2) { 
            printUsage(); 
        } 
        String brokerList = args[0]; 
        String topic = args[1]; 
        String filePath = "/home/data/";    //源數據獲取路徑 
        Properties props = new Properties(); 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); 
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); 
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
        Producer<String, String> producer = new KafkaProducer<String, String>(props); 
 
        for (int m = 0; m < Integer.MAX_VALUE / 2; m++) { 
            File dir = new File(filePath); 
            File[] files = dir.listFiles(); 
            if (files != null) { 
                for (File file : files) { 
                    if (file.isDirectory()) { 
                        System.out.println(file.getName() + "This is a directory!"); 
                    } else { 
                        BufferedReader reader = null; 
                        reader = new BufferedReader(new FileReader(filePath + file.getName())); 
                        String tempString = null; 
                        while ((tempString = reader.readLine()) != null) { 
                            // Blank line judgment 
                            if (!tempString.isEmpty()) { 
                                producer.send(new ProducerRecord<String, String>(topic, tempString)); 
                            } 
                        } 
                        // make sure the streams are closed finally. 
                        reader.close(); 
                    } 
                } 
            } 
            try { 
                Thread.sleep(3); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    private static void printUsage() { 
        System.out.println("Usage: {brokerList} {topic}"); 
    } 
}

3、本地配置好Maven及SDK相關參數後,樣例工程會自動載入相關依賴包。載入完畢後,執行package打包,獲取打包後的Jar文件。

例如打包後的Jar文件為“SparkStreamingKafka010JavaExample-1.0.jar”。

步驟3:上傳Jar包及源數據

1、準備向Kafka發送的源數據,例如如下的“input_data.txt”文件,將該文件上傳到客戶端節點的“/home/data”目錄下。

ZhangSan 
LiSi 
WangwWU 
Tom 
Jemmmy 
LinDa

2、將編譯後的Jar包上傳到客戶端節點,例如上傳到“/opt”目錄。

說明:如果本地網路無法直接連接客戶端節點上傳文件,可先將jar文件或者源數據上傳至OBS文件系統中,然後通過MRS管理控制台集群內的“文件管理”頁面導入HDFS中,再通過HDFS客戶端使用hdfs dfs -get命令下載到客戶端節點本地。

步驟4:運行作業並查看結果

1、使用root用戶登錄安裝了集群客戶端的節點。

cd /opt/client
source bigdata_env

2、創建用於接收數據的Kafka Topic。

kafka-topics.sh --create --zookeeper quorumpeer實例IP地址:ZooKeeper客戶端連接埠/kafka --replication-factor 2 --partitions 3 --topic topic名稱

quorumpeer實例IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > ZooKeeper > 實例”界面中查詢,多個地址可用“,”分隔。ZooKeeper客戶端連接埠可通過ZooKeeper服務配置參數“clientPort”查詢,預設為2181。

例如執行以下命令:

kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka

返回結果如下:

Created topic sparkkafka.

3、Topic創建成功後,運行程式向Kafka發送數據。

java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer Broker實例IP地址:Kafka連接埠 topic名稱

Kafka Broker實例IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > Kafka > 實例”界面中查詢,多個地址可用“,”分隔。Broker埠號可通過Kafka服務配置參數“port”查詢,預設為9092。

例如執行以下命令:

java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka
返回結果如下:
... 
transactional.id = null 
value.serializer = class org.apache.kafka.common.serialization.StringSerializer 
 
2022-06-08 15:43:42 INFO  AppInfoParser:117 - Kafka version: xxx 
2022-06-08 15:43:42 INFO  AppInfoParser:118 - Kafka commitId: xxx 
2022-06-08 15:43:42 INFO  AppInfoParser:119 - Kafka startTimeMs: xxx 
2022-06-08 15:43:42 INFO  Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A

4、重新打開一個客戶端連接視窗,執行以下命令,讀取Kafka Topic中的數據。

cd /opt/client/Spark2x/spark
source bigdata_env
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

<checkPointDir>指應用程式結果備份到HDFS的路徑,自行指定即可,例如“/tmp”。
<brokers>指獲取元數據的Kafka地址,格式為“Broker實例IP地址:Kafka連接埠”。
<topic>指讀取Kafka上的topic名稱。
<batchTime>指Streaming分批的處理間隔,例如設置為“5”。

例如執行以下命令:

cd /opt/client/Spark2x/spark
source bigdata_env
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5

程式運行後,可查看到Kafka中數據的統計結果:

.... 
-------------------------------------------                                      
Time: 1654674380000 ms 
------------------------------------------- 
(ZhangSan,6) 
(Tom,6) 
(LinDa,6) 
(WangwWU,6) 
(LiSi,6) 
(Jemmmy,6) 
 
-------------------------------------------                                      
Time: 1654674385000 ms 
------------------------------------------- 
(ZhangSan,717) 
(Tom,717) 
(LinDa,717) 
(WangwWU,717) 
(LiSi,717) 
(Jemmmy,717) 
 
------------------------------------------- 
Time: 1654674390000 ms 
------------------------------------------- 
(ZhangSan,2326) 
(Tom,2326) 
(LinDa,2326) 
(WangwWU,2326) 
(LiSi,2326) 
(Jemmmy,2326) 
 ...

5、登錄FusionInsight Manager界面,單擊“集群 > 服務 > Spark2x”。

6、在服務概覽頁面點擊Spark WebUI後的鏈接地址,可進入History Server頁面。

單擊待查看的App ID,您可以查看Spark Streaming作業的狀態。

----結束

好了,本期雲小課就介紹到這裡,快去體驗MapReduce(MRS)更多功能吧!猛戳這裡

 

點擊關註,第一時間瞭解華為雲新鮮技術~


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 大數據時代,各行各業對數據採集的需求日益增多,網路爬蟲的運用也更為廣泛,越來越多的人開始學習網路爬蟲這項技術,K哥爬蟲此前已經推出不少爬蟲進階、逆向相關文章,為實現從易到難全方位覆蓋,特設【0基礎學爬蟲】專欄,幫助小白快速入門爬蟲,本期為 HTTP 協議的基本原理介紹。 電腦網路模型 電腦網路是 ...
  • Java基礎語法:類型轉換、變數、常量 類型轉換 低 >高 byte,short,char->int->long->float->double 從高到低:強制轉換 從低到高:自動轉換 註意點:1. 不能對布爾型進行轉換; 2. 在把高容量轉換成低容量的時候,強制轉換; 3. 轉換的時候可能存在記憶體溢 ...
  • 最佳組隊問題 雙人混合ACM程式設計競賽即將開始,因為是雙人混合賽,故每支隊伍必須由1男1女組成。現在需要對n名男隊員和n名女隊員進行配對。由於不同隊員之間的配合優勢不一樣,因此,如何組隊成了大問題。 給定n×n優勢矩陣P,其中P[i][j]表示男隊員i和女隊員j進行組隊的競賽優勢(0<P[i][j ...
  • 應用場景 在軟體系統中,經常會需要將一些現成的對象放到新的環境中使用,但是新的環境要求的介面,是這些現存對象所不能滿足的。如何能利用現有的對象,又能滿足新的引用環境所需的介面? 適配器優點 更好的復用性。如果功能已經存在,只是介面不相容,通過適配器模式就可以讓這些功能得到更好的復用。 適配器缺點 由 ...
  • 1.說說你知道的ORM框架? 2.請問對EFCore有瞭解嗎? 3.說說EFCore查詢的性能調優小技巧? 4.EFCore 如果通過數據生成實體和DbContext? 5.說說對SaveChanges的理解? 6.說說對EFCore中EntityState的理解。? 7.說說什麼是導航屬性和引用屬 ...
  • 概述 面臨一個複雜對象的創建工作,通常由各個部分的子對象用一定的演算法構成。子部件(對象)比較多,對象不能當作一個完整的對象或者產品使用(郵件:發件人,收件人、抄送人、主題、郵件內容)子部件需要按照一定的順序賦值才有一定的意義,在某個子部件沒有賦值之前,另一個子部件就無法賦值。 類圖 註:該類圖來源網 ...
  • Excel 公式引用當前單元格左側單元格 引用當前單元格左側的第一個單元格:=OFFSET(INDIRECT(ADDRESS(ROW(), COLUMN())),0,-1)。 ROW()返回當前單元格的行號,COLUMN()返回當前單元格的列號。 ADDRESS函數可以根據指定行號和列號獲得工作表中 ...
  • # MySQL調優 ## 資料庫優化常見方案 1. 優化shema,sql語句+索引2. 加緩存,memcached,redis3. 主從複製,讀寫分離4. 垂直拆分5. 水平拆分 為了知道怎麼優化SQL,必須先清楚SQL的生命周期 ## SQL生命周期 1. 應用伺服器連接資料庫伺服器,建立一個T ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...