SparkStreaming與Kafka整合遇到的問題及解決方案

来源:http://www.cnblogs.com/wyfjk/archive/2017/07/13/7162131.html
-Advertisement-
Play Games

SparkStreaming與Kafka整合遇到的問題及解決方案 ...


前言

最近工作中是做日誌分析的平臺,採用了sparkstreaming+kafka,採用kafka主要是看中了它對大數據量處理的高性能,處理日誌類應用再好不過了,採用了sparkstreaming的流處理框架 主要是考慮到它本身是基於spark核心的,以後的批處理可以一站式服務,並且可以提供準實時服務到elasticsearch中,可以實現準實時定位系統日誌。

實現

Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式。

一. 基於Receiver方式

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的記憶體中的,然後Spark Streaming啟動的job會去處理那些數據。代碼如下:

    SparkConf sparkConf = new SparkConf().setAppName("log-etl").setMaster("local[4]");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    int numThreads = Integer.parseInt("4");
    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put("group-45", numThreads);
     //接收的參數分別是JavaStreamingConetxt,zookeeper連接地址,groupId,kafak的topic 
    JavaPairReceiverInputDStream<String, String> messages =
    KafkaUtils.createStream(jssc, "172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181", "1", topicMap);

剛開始的時候系統正常運行,沒有發現問題,但是如果系統異常重新啟動sparkstreaming程式後,發現程式會重覆處理已經處理過的數據,這種基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。官方現在也已經不推薦這種整合方式,官網相關地址 http://spark.apache.org/docs/latest/streaming-kafka-integration.html ,下麵我們使用官網推薦的第二種方式kafkaUtils的createDirectStream()方式。

二.基於Direct的方式

這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據後,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的範圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。

代碼如下:

SparkConf sparkConf = new SparkConf().setAppName("log-etl");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
    jssc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParams,
    topicsSet
);

這種direct方式的優點如下:

1.簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然後對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關係。

2.一次且僅一次的事務機制:基於receiver的方式,在spark和zk中通信,很有可能導致數據的不一致。

3.高效率:在receiver的情況下,如果要保證數據的不丟失,需要開啟wal機制,這種方式下,為、數據實際上被覆制了兩份,一份在kafka自身的副本中,另外一份要複製到wal中, direct方式下是不需要副本的。

三.基於Direct方式丟失消息的問題

貌似這種方式很完美,但是還是有問題的,當業務需要重啟sparkstreaming程式的時候,業務日誌依然會打入到kafka中,當job重啟後只能從最新的offset開始消費消息,造成重啟過程中的消息丟失。kafka中的offset如下圖(使用kafkaManager實時監控隊列中的消息):

 

當停止業務日誌的接受後,先重啟spark程式,但是發現job並沒有將先前打入到kafka中的數據消費掉。這是因為消息沒有經過zk,topic的offset也就沒有保存

四.解決消息丟失的處理方案

一般有兩種方式處理這種問題,可以先spark streaming 保存offset,使用spark checkpoint機制,第二種是程式中自己實現保存offset邏輯,我比較喜歡第二種方式,以為這種方式可控,所有主動權都在自己手中。

先看下大體流程圖,

SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("log-etl");
 Set<String> topicSet = new HashSet<String>();
        topicSet.add("group-45");
        kafkaParam.put("metadata.broker.list", "172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092");
        kafkaParam.put("group.id", "simple1");

        // transform java Map to scala immutable.map
        scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
        scala.collection.immutable.Map<String, String> scalaKafkaParam =
                testMap.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
                    public Tuple2<String, String> apply(Tuple2<String, String> v1) {
                        return v1;
                    }
                });

        // init KafkaCluster
        kafkaCluster = new KafkaCluster(scalaKafkaParam);

        scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
        immutableTopics = mutableTopics.toSet();
        scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get();

        // kafka direct stream 初始化時使用的offset數據
        Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();

        // 沒有保存offset時(該group首次消費時), 各個partition offset 預設為0
        if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) {

            System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get());

            Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2);

            for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
                consumerOffsetsLong.put(topicAndPartition, 0L);
            }

        }
        // offset已存在, 使用保存的offset
        else {

            scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("simple1", topicAndPartitionSet2).right().get();

            Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp);

            Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2);

            for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
                Long offset = (Long)consumerOffsets.get(topicAndPartition);
                consumerOffsetsLong.put(topicAndPartition, offset);
            }

        }

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
        kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);

        // create direct stream
        JavaInputDStream<String> message = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                String.class,
                kafkaParam,
                consumerOffsetsLong,
                new Function<MessageAndMetadata<String, String>, String>() {
                    public String call(MessageAndMetadata<String, String> v1) throws Exception {
                        System.out.println("接收到的數據《《==="+v1.message());
                        return v1.message();
                    }
                }
        );

        // 得到rdd各個分區對應的offset, 並保存在offsetRanges中
        final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();

        JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
            public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                offsetRanges.set(offsets);
                return rdd;
            }
        });

        // output
        javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> v1) throws Exception {
                if (v1.isEmpty()) return null;

                List<String> list = v1.collect();
                for(String s:list){
                    System.out.println("數據==="+s);
                }

                for (OffsetRange o : offsetRanges.get()) {

                    // 封裝topic.partition 與 offset對應關係 java Map
                    TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
                    Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
                    topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());

                    // 轉換java map to scala immutable.map
                    scala.collection.mutable.Map<TopicAndPartition, Object> testMap =
                            JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap);
                    scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap =
                            testMap.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
                                public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
                                    return v1;
                                }
                            });

                    // 更新offset到kafkaCluster
                    kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap);
                       System.out.println("原數據====》"+o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
                    );
                }
                return null;
            }
        });

        jssc.start();
        jssc.awaitTermination();
    }

基本使用這種方式就可以解決數據丟失的問題。

文章來源:https://my.oschina.net/u/1792341/blog/1341327


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 阿裡雲最近推出了移動熱修複服務,聽說這個服務傻瓜式接入,性能相對較好,對新技術比較好奇的我決定嘗試一下。 移動熱修複.png 移動熱修複.png 首先,需要開通這個服務,創建應用 創建應用.png 創建應用.png 然後,在項目中接入服務。按照文檔所述,第一步:gradle遠程倉庫依賴, 打開項目找 ...
  • 一、NSThread基本概念 NSThread是基於線程使用,輕量級的多線程編程方法(相對GCD和NSOperation),一個NSThread對象代表一個線程,需要手動管理線程的生命周期,處理線程同步等問題。 二、創建、啟動線程 1、動態實例化 - 先創建再人工啟動 2、靜態實例化 - 創建後自啟 ...
  • 公司的apk越做越大。。。作為一個有追求的程式員,我覺得有必要給apk瘦身了。。。 優化之前,先來分析一下apk結構,下麵附上一張apk結構圖: apk結構.png apk結構.png 由於我這個項目集成了百度地圖、百度導航。。。所以assets和lib各自變態的占了12M+,有種蛋蛋的憂傷。。。百 ...
  • 轉載請標明出處:http://blog.csdn.net/zhaoyanjun6/article/details/73468287 本文出自 "【趙彥軍的博客】" 在 Fiddler 中自帶了一個 QuickExec 命令行,用戶可以直接輸入並快速執行腳本命令。 常見命令 help 打開官方的使用頁 ...
  • 經常需要註釋,取消註釋代碼 Ctrl + / 對每段代碼前面添加或者取消 // Ctrl + Shift + / 對代碼添加 或取消 /* */ 實用快捷鍵,持續更新中... ... ...
  • 一,代碼。 - (void)viewDidLoad { [super viewDidLoad]; // Do any additional setup after loading the view, typically from a nib. NSLog(@"--產生隨機數 %@",[self ge ...
  • xtrabackup是由percona提供的mysql備份工具,它是一款物理備份工具,通過連接資料庫把資料庫的數據備份出來。對於innodb存儲引擎其支持全量備份和增量備份。對於myisam存儲引擎只支持增量備份。因為xtrabackup對innodb的增量備份是基於表空間的LSN進行的,所謂LSN ...
  • 首先是寫一個分割字元串的函數,返回table類型 CREATE OR REPLACE FUNCTION fn_split (p_str IN VARCHAR2, p_delimiter IN VARCHAR2)RETURN ty_str_splitIS j INT := 0; i INT := 1; ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...