閱識風雲是華為雲信息大咖,擅長將複雜信息多元化呈現,其出品的一張圖(雲圖說)、深入淺出的博文(雲小課)或短視頻(雲視廳)總有一款能讓您快速上手華為雲。更多精彩內容請單擊此處。 摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介 ...
閱識風雲是華為雲信息大咖,擅長將複雜信息多元化呈現,其出品的一張圖(雲圖說)、深入淺出的博文(雲小課)或短視頻(雲視廳)總有一款能讓您快速上手華為雲。更多精彩內容請單擊此處。
摘要:Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka數據。
本文分享自華為雲社區《【雲小課】EI第48課 MRS數據分析-通過Spark Streaming作業消費Kafka數據》,作者: 閱識風雲 。
Spark是分散式批處理框架,提供分析挖掘與迭代式記憶體計算能力,支持多種語言(Scala/Java/Python)的應用開發。
Spark Streaming是一種構建在Spark上的實時計算框架,擴展了Spark處理大規模流式數據的能力。本文介紹如何使用MRS集群運行Spark Streaming作業消費Kafka數據。
在本案例中,假定某個業務Kafka每1秒就會收到1個單詞記錄。基於業務需要,開發的Spark應用程式實現實時累加計算每個單詞的記錄總數的功能。
本案例基本操作流程如下所示:
- 創建MRS集群。
- 準備應用程式。
- 上傳Jar包及源數據。
- 運行作業並查看結果。
場景描述
Spark提供分析挖掘與迭代式記憶體計算能力, 適用以下場景:
- 數據處理(Data Processing):可以用來快速處理數據,兼具容錯性和可擴展性。
- 迭代計算(Iterative Computation):支持迭代計算,有效應對多步的數據處理邏輯。
- 數據挖掘(Data Mining):在海量數據基礎上進行複雜的挖掘分析,可支持各種數據挖掘和機器學習演算法。
- 流式處理(Streaming Processing):支持秒級延遲的流式處理,可支持多種外部數據源。
- 查詢分析(Query Analysis):支持標準SQL查詢分析,同時提供DSL(DataFrame), 並支持多種外部輸入。
當前Spark支持兩種數據處理方式:Direct Streaming和Receiver方式。
Direct Streaming方式主要通過採用Direct API對數據進行處理。以Kafka Direct介面為例,與啟動一個Receiver來連續不斷地從Kafka中接收數據並寫入到WAL中相比,Direct API簡單地給出每個batch區間需要讀取的偏移量位置。然後,每個batch的Job被運行,而對應偏移量的數據在Kafka中已準備好。這些偏移量信息也被可靠地存儲在checkpoint文件中,應用失敗重啟時可以直接讀取偏移量信息。
Direct Kafka介面數據傳輸
需要註意的是,Spark Streaming可以在失敗後重新從Kafka中讀取並處理數據段。然而,由於語義僅被處理一次,重新處理的結果和沒有失敗處理的結果是一致的。
因此,Direct API消除了需要使用WAL和Receivers的情況,且確保每個Kafka記錄僅被接收一次,這種接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。總體來說,這些特性使得流處理管道擁有高容錯性、高效性及易用性,因此推薦使用Direct Streaming方式處理數據。
在一個Spark Streaming應用開始時(也就是Driver開始時),相關的StreamingContext(所有流功能的基礎)使用SparkContext啟動Receiver成為長駐運行任務。這些Receiver接收並保存流數據到Spark記憶體中以供處理。用戶傳送數據的生命周期如圖1-2所示:
數據傳輸生命周期
- 接收數據(藍色箭頭)
Receiver將數據流分成一系列小塊,存儲到Executor記憶體中。另外,在啟用預寫日誌(Write-ahead Log,簡稱WAL)以後,數據同時還寫入到容錯文件系統的預寫日誌中。 - 通知Driver(綠色箭頭)
接收塊中的元數據(Metadata)被髮送到Driver的StreamingContext。這個元數據包括:
定位其在Executor記憶體中數據位置的塊Reference ID。
若啟用了WAL,還包括塊數據在日誌中的偏移信息。 - 處理數據(紅色箭頭)
對每個批次的數據,StreamingContext使用Block信息產生RDD及其Job。StreamingContext通過運行任務處理Executor記憶體中的Block來執行Job。 - 周期性地設置檢查點(橙色箭頭)
- 為了容錯的需要,StreamingContext會周期性地設置檢查點,並保存到外部文件系統中。
華為雲MapReduce服務提供了Spark服務多種場景下的樣例工程,本案例對應示例場景的開發思路:
- 接收Kafka中數據,生成相應DStream。
- 對單詞記錄進行分類統計。
- 計算結果,併進行列印。
步驟1:創建MRS集群
1、創建併購買一個包含有Spark2x、Kafka組件的MRS集群,詳情請參見MRS用戶指南的“購買自定義集群”。
說明:本文以購買的MRS 3.1.0版本的集群為例,集群未開啟Kerberos認證。
2、集群購買成功後,在MRS集群的任一節點內,安裝集群客戶端,具體操作可參考MRS快速入門的“安裝並使用集群客戶端”。
例如客戶端安裝目錄為“/opt/client”。
步驟2:準備應用程式
1、通過開源鏡像站獲取樣例工程。
下載樣例工程的Maven工程源碼和配置文件,併在本地配置好相關開發工具,可參考MRS開髮指南(普通版_3.x)的“通過開源鏡像站獲取樣例工程”。
根據集群版本選擇對應的分支,下載並獲取MRS相關樣例工程。
例如本章節場景對應示例為“SparkStreamingKafka010JavaExample”樣例。
2、本地使用IDEA工具導入樣例工程,等待Maven工程下載相關依賴包,具體操作可參考考MRS開髮指南(普通版_3.x)的Spark開髮指南(普通模式)的“配置並導入樣例工程”。
在本示例工程中,通過使用Streaming調用Kafka介面來獲取單詞記錄,然後把單詞記錄分類統計,得到每個單詞記錄數,關鍵代碼片段如下:
public class StreamingExampleProducer { public static void main(String[] args) throws IOException { if (args.length < 2) { printUsage(); } String brokerList = args[0]; String topic = args[1]; String filePath = "/home/data/"; //源數據獲取路徑 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int m = 0; m < Integer.MAX_VALUE / 2; m++) { File dir = new File(filePath); File[] files = dir.listFiles(); if (files != null) { for (File file : files) { if (file.isDirectory()) { System.out.println(file.getName() + "This is a directory!"); } else { BufferedReader reader = null; reader = new BufferedReader(new FileReader(filePath + file.getName())); String tempString = null; while ((tempString = reader.readLine()) != null) { // Blank line judgment if (!tempString.isEmpty()) { producer.send(new ProducerRecord<String, String>(topic, tempString)); } } // make sure the streams are closed finally. reader.close(); } } } try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void printUsage() { System.out.println("Usage: {brokerList} {topic}"); } }
3、本地配置好Maven及SDK相關參數後,樣例工程會自動載入相關依賴包。載入完畢後,執行package打包,獲取打包後的Jar文件。
例如打包後的Jar文件為“SparkStreamingKafka010JavaExample-1.0.jar”。
步驟3:上傳Jar包及源數據
1、準備向Kafka發送的源數據,例如如下的“input_data.txt”文件,將該文件上傳到客戶端節點的“/home/data”目錄下。
ZhangSan
LiSi
WangwWU
Tom
Jemmmy
LinDa
2、將編譯後的Jar包上傳到客戶端節點,例如上傳到“/opt”目錄。
說明:如果本地網路無法直接連接客戶端節點上傳文件,可先將jar文件或者源數據上傳至OBS文件系統中,然後通過MRS管理控制台集群內的“文件管理”頁面導入HDFS中,再通過HDFS客戶端使用hdfs dfs -get命令下載到客戶端節點本地。
步驟4:運行作業並查看結果
1、使用root用戶登錄安裝了集群客戶端的節點。
cd /opt/client
source bigdata_env
2、創建用於接收數據的Kafka Topic。
kafka-topics.sh --create --zookeeper quorumpeer實例IP地址:ZooKeeper客戶端連接埠/kafka --replication-factor 2 --partitions 3 --topic topic名稱
quorumpeer實例IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > ZooKeeper > 實例”界面中查詢,多個地址可用“,”分隔。ZooKeeper客戶端連接埠可通過ZooKeeper服務配置參數“clientPort”查詢,預設為2181。
例如執行以下命令:
kafka-topics.sh --create --zookeeper 192.168.0.17:2181/kafka --replication-factor 2 --partitions 2 --topic sparkkafka
返回結果如下:
Created topic sparkkafka.
3、Topic創建成功後,運行程式向Kafka發送數據。
java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer Broker實例IP地址:Kafka連接埠 topic名稱
Kafka Broker實例IP地址可登錄集群的FusionInsight Manager界面,在“集群 > 服務 > Kafka > 實例”界面中查詢,多個地址可用“,”分隔。Broker埠號可通過Kafka服務配置參數“port”查詢,預設為9092。
例如執行以下命令:
java -cp /opt/SparkStreamingKafka010JavaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.StreamingExampleProducer 192.168.0.131:9092 sparkkafka
返回結果如下:
... transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2022-06-08 15:43:42 INFO AppInfoParser:117 - Kafka version: xxx 2022-06-08 15:43:42 INFO AppInfoParser:118 - Kafka commitId: xxx 2022-06-08 15:43:42 INFO AppInfoParser:119 - Kafka startTimeMs: xxx 2022-06-08 15:43:42 INFO Metadata:259 - [Producer clientId=DemoProducer] Cluster ID: d54RYHthSUishVb6nTHP0A
4、重新打開一個客戶端連接視窗,執行以下命令,讀取Kafka Topic中的數據。
cd /opt/client/Spark2x/spark source bigdata_env bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
<checkPointDir>指應用程式結果備份到HDFS的路徑,自行指定即可,例如“/tmp”。
<brokers>指獲取元數據的Kafka地址,格式為“Broker實例IP地址:Kafka連接埠”。
<topic>指讀取Kafka上的topic名稱。
<batchTime>指Streaming分批的處理間隔,例如設置為“5”。
例如執行以下命令:
cd /opt/client/Spark2x/spark source bigdata_env bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar /tmp 192.168.0.131:9092 sparkkafka 5
程式運行後,可查看到Kafka中數據的統計結果:
.... ------------------------------------------- Time: 1654674380000 ms ------------------------------------------- (ZhangSan,6) (Tom,6) (LinDa,6) (WangwWU,6) (LiSi,6) (Jemmmy,6) ------------------------------------------- Time: 1654674385000 ms ------------------------------------------- (ZhangSan,717) (Tom,717) (LinDa,717) (WangwWU,717) (LiSi,717) (Jemmmy,717) ------------------------------------------- Time: 1654674390000 ms ------------------------------------------- (ZhangSan,2326) (Tom,2326) (LinDa,2326) (WangwWU,2326) (LiSi,2326) (Jemmmy,2326) ...
5、登錄FusionInsight Manager界面,單擊“集群 > 服務 > Spark2x”。
6、在服務概覽頁面點擊Spark WebUI後的鏈接地址,可進入History Server頁面。
單擊待查看的App ID,您可以查看Spark Streaming作業的狀態。
----結束
好了,本期雲小課就介紹到這裡,快去體驗MapReduce(MRS)更多功能吧!猛戳這裡