storm報錯:Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout])

来源:https://www.cnblogs.com/suhaha/archive/2019/04/29/10792472.html
-Advertisement-
Play Games

問題描述: storm版本:1.2.2,kafka版本:2.11。 在使用storm去消費kafka中的數據時,發生瞭如下錯誤。 報錯圖示如下: 報錯的意思為:mybolt這個組件,在從kafka_sput組件上消費消息時,它所消費的default數據流是不存在的。 上面的報錯是因為代碼中有地方寫錯 ...


問題描述:

  storm版本:1.2.2,kafka版本:2.11。

    在使用storm去消費kafka中的數據時,發生瞭如下錯誤。

[root@node01 jars]# /opt/storm-1.2.2/bin/storm jar MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar com.suhaha.storm.storm122_kafka211_demo02.KafkaTopoDemo stormkafka
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/jars/MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Running: /usr/java/jdk1.8.0_181/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm-1.2.2 -Dstorm.log.dir=/opt/storm-1.2.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm-1.2.2/*:/opt/storm-1.2.2/lib/*:/opt/storm-1.2.2/extlib/*:MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar:/opt/storm-1.2.2/conf:/opt/storm-1.2.2/bin -Dstorm.jar=MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} com.suhaha.storm.storm122_kafka211_demo02.KafkaTopoDemo stormkafka
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/jars/MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
1329 [main] INFO  o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'auto.offset.reset' to 'earliest' to ensure at-least-once processing
1338 [main] INFO  o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'enable.auto.commit' to 'false', because the spout does not support auto-commit
【run on cluster】
1617 [main] WARN  o.a.s.u.Utils - STORM-VERSION new 1.2.2 old null
1699 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -9173161025072727826:-6858502481790933429
1857 [main] WARN  o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
1917 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : node01:6627
1947 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
1948 [main] WARN  o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds
1950 [main] INFO  o.a.s.u.NimbusClient - Found leader nimbus : node01:6627
1998 [main] INFO  o.a.s.StormSubmitter - Uploading dependencies - jars...
1998 [main] INFO  o.a.s.StormSubmitter - Uploading dependencies - artifacts...
1998 [main] INFO  o.a.s.StormSubmitter - Dependency Blob keys - jars : [] / artifacts : []
2021 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar
3832 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar
3832 [main] INFO  o.a.s.StormSubmitter - Submitting topology stormkafka in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-9173161025072727826:-6858502481790933429","topology.workers":1,"topology.debug":true}
3832 [main] WARN  o.a.s.u.Utils - STORM-VERSION new 1.2.2 old 1.2.2
5588 [main] WARN  o.a.s.StormSubmitter - Topology submission exception: Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]
Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout])
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
    at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
    at com.suhaha.storm.storm122_kafka211_demo02.KafkaTopoDemo.main(KafkaTopoDemo.java:47)
Caused by: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout])
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
    at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
    at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
    at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
    at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
    at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
    at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
    at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
    ... 3 more

 

報錯圖示如下:

報錯的意思為:mybolt這個組件,在從kafka_sput組件上消費消息時,它所消費的default數據流是不存在的。

上面的報錯是因為代碼中有地方寫錯了,下麵貼出代碼

1)KafkaTopoDemo類(main方法入口類和kafkaSpout設置

 1 package com.suhaha.storm.storm122_kafka211_demo02;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerConfig;
 4 import org.apache.storm.Config;
 5 import org.apache.storm.LocalCluster;
 6 import org.apache.storm.StormSubmitter;
 7 import org.apache.storm.generated.AlreadyAliveException;
 8 import org.apache.storm.generated.AuthorizationException;
 9 import org.apache.storm.generated.InvalidTopologyException;
10 import org.apache.storm.kafka.spout.*;
11 import org.apache.storm.topology.TopologyBuilder;
12 import org.apache.storm.tuple.Fields;
13 import org.apache.storm.tuple.Values;
14 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
15 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
16 
17 /**
18  * @author suhaha
19  * @create 2019-04-28 00:44
20  * @comment storm消費kafka數據
21  */
22 
23 public class KafkaTopoDemo {
24     public static void main(String[] args) {
25         final TopologyBuilder topologybuilder = new TopologyBuilder();
26         //簡單的不可靠spout
27 //        topologybuilder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("node01:9092,node02:9092,node03:9092", "topic01").build()));
28 
29         //詳細的設置spout,寫一個方法生成KafkaSpoutConfig
30         topologybuilder.setSpout("kafka_spout", new KafkaSpout<String,String>(newKafkaSpoutConfig("topic01")));
31 
32         topologybuilder.setBolt("mybolt", new MyBolt("/tmp/storm_test.log")).shuffleGrouping("kafka_spout");
33 
34         //上面設置的是topology,現在設置storm配置
35         Config stormConf=new Config();
36         stormConf.setNumWorkers(1);
37         stormConf.setDebug(true);
38 
39         if (args != null && args.length > 0) {//集群提交
40             System.out.println("【run on cluster】");
41 
42             try {
43                 StormSubmitter.submitTopology(args[0], stormConf, topologybuilder.createTopology());
44             } catch (AlreadyAliveException e) {
45                 e.printStackTrace();
46             } catch (InvalidTopologyException e) {
47                 e.printStackTrace();
48             } catch (AuthorizationException e) {
49                 e.printStackTrace();
50             }
51             System.out.println("提交完成");
52 
53         } else {//本地提交
54             System.out.println("【run on local】");
55             LocalCluster lc = new LocalCluster();
56             lc.submitTopology("storm_kafka", stormConf, topologybuilder.createTopology());
57         }
58     }
59 
60 
61     /**
62      * KafkaSpoutConfig設置
63     */
64     private static KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String topic) {
65         ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
66                 (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
67                 new Fields("topic", "partition", "offset", "key", "value"), "stream1");
68         //bootstrapServer 以及topic
69         return KafkaSpoutConfig.builder("node01:9092,node02:9092,node03:9092", topic)
70                 .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())//設置kafka使用者組屬性"group.id"
71                 .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
72                 .setRecordTranslator(trans)//修改spout如何將Kafka消費者message轉換為tuple,以及將該tuple發佈到哪個stream中
73                 .setRetry(getRetryService())//重試策略
74                 .setOffsetCommitPeriodMs(10_000)
75                 .setFirstPollOffsetStrategy(EARLIEST)//允許你設置從哪裡開始消費數據
76                 .setMaxUncommittedOffsets(250)
77                 .build();
78     }
79 
80     /**
81      * 重試策略設置
82     */
83     protected static KafkaSpoutRetryService getRetryService() {
84         return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
85             TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
86     }
87 }

 

 2)bolt類(跟問題沒啥關係)

 1 package com.suhaha.storm.storm122_kafka211_demo02;
 2 
 3 import org.apache.storm.task.OutputCollector;
 4 import org.apache.storm.task.TopologyContext;
 5 import org.apache.storm.topology.IRichBolt;
 6 import org.apache.storm.topology.OutputFieldsDeclarer;
 7 import org.apache.storm.tuple.Tuple;
 8 import java.io.FileWriter;
 9 import java.io.IOException;
10 import java.util.Map;
11 
12 /**
13  * @author suhaha
14  * @create 2019-04-28 01:05
15  * @comment  該bolt中的處理邏輯非常簡單,只是簡單的從input中將各類數據取出來,然後簡單的列印出來
16  *         並且將數據列印到path指定的文件中(這裡需要註意的是,最終寫出的文件是在執行該bolt task的worker上的,
17  *      而不在nimbus伺服器路徑下,也不一定在提交storm job的伺服器上)
18  */
19 
20 public class MyBolt implements IRichBolt {
21     private FileWriter fileWriter = null;
22     String path = null;
23 
24     @Override
25     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
26         try {
27             fileWriter = new FileWriter(path);
28         } catch (IOException e) {
29             e.printStackTrace();
30         }
31     }
32 
33     /**
34      * 構造方法
35      * @param path
36      */
37     public MyBolt(String path) {
38         this.path = path;
39     }
40 
41 
42     @Override
43     public void execute(Tuple input) {
44         System.out.println(input);
45         try {
46             /**
47              * 從input中獲取相應數據
48              */
49             System.out.println("=========================");
50             String topic = input.getString(0);
51             System.out.println("index 0 --> " + topic);        //topic
52             System.out.println("topic   --> " + input.getStringByField("topic"));
53 
54             System.out.println("-------------------------");
55             System.out.println("index 1 --> " + input.getInteger(1));        //partition
56             Integer partition = input.getIntegerByField("partition");
57             System.out.println("partition-> " + partition);
58 
59             System.out.println("-------------------------");
60             Long offset = input.getLong(2);
61             System.out.println("index 2 --> " + offset);        //offset
62             System.out.println("offset----> " +input.getLongByField("offset"));
63 
64             System.out.println("-------------------------");
65             String key = input.getString(3);
66             System.out.println("index 3 --> " + key);        //key
67             System.out.println("key-------> " + input.getStringByField("key"));
68 
69             System.out.println("-------------------------");
70             String value = input.getString(4);
71             System.out.println("index 4 --> " + value);        //value
72             System.out.println("value--> " + input.getStringByField("value"));
73 
74             String info = "topic: " + topic + ", partiton: " +partition + ", offset: " + offset + ", key: " + key +", value: " + value + "\n";
75             System.out.println("info = " + info);
76             fileWriter.write(info);
77             fileWriter.flush();
78         } catch (Exception e) {
79             e.printStackTrace();
80         }
81     }
82 
83     @Override
84     public void cleanup() {
85         // TODO Auto-generated method stub
86     }
87 
88     @Override
89     public void declareOutputFields(OutputFieldsDeclarer declarer) {
90         // TODO Auto-generated method stub
91     }
92 
93     @Override
94     public Map<String, Object> getComponentConfiguration() {
95         // TODO Auto-generated method stub
96         return null;
97     }
98 }

 


 

錯誤出現在KafkaTopoDemo類中,已在上面的代碼中做了黃色高亮標註。

錯誤的原因在於,在代碼中對RecordTranslator進行設置時(第67行),將數據流Id設置成了stream1;而在對topologyBuilder設置bolt時(第32行),使用的分發策略是shuffleGrouping("kafka_spout"),其實錯誤跟分發策略沒關係,但是跟分發策略的使用方式有關係——當使用shuffleGrouping(String componentId)這種方式設置分發策略時,mybolt組件預設是從上游組件的default 這個數據流中獲取數據,而在代碼中,我已將上游(kafka_spout)的數據流id設置成了stream1,故而導致了報錯(InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]),說default數據流不存在)。

因此,需要對代碼做了相應修改,即:在設置mybolt組件的分發策略時,使用shuffleGrouping(String componentId, String streamId),手動指定要讀取的數據流id為stream1,如此,程式就不會報該錯誤了。

topologybuilder.setBolt("mybolt", new MyBolt("/tmp/storm_test.log")).shuffleGrouping("kafka_spout", "stream1");

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.簡述 Yellow dog Updater, Modified由Duke University團隊,修改Yellow Dog Linux的Yellow Dog Updater開發而成,是一個基於RPM包管理的字元前端軟體包管理器。能夠從指定的伺服器自動下載RPM包並且安裝,可以處理依賴性關係,並 ...
  • 1,pcb:進程式控制制塊結構體:/usr/src/linux headers 4.15.0 29/include/linux/sched.h 進程id:系統中每個進程有唯一的id,在c語言中用pid_t類型表示,是個非負整數。 進程狀態:就緒,運行,掛起,停止等狀態 描述虛擬地址空間的信息 描述控制終 ...
  • #!/bin/shwhile [ true ]; do #查詢是否有8899正在運行的進程netstat -an|grep 8899if [ $? -ne 0 ]thennowtime=$(date +%Y-%m-%d\ %H:%M:%S)echo "end process.....">> issu ...
  • 昨天亞馬遜又掛了,為什麼是又呢,因為每年亞馬遜都要掛幾次。 昨天是什麼日子讓亞馬遜又掛了呢?不就是因為清倉促銷嗎……你的驕傲呢,高可用呢,負載均衡呢,分散式呢,三駕馬車怎麼一駕都不管用了呢? 不就是在國內弄個促銷麽……不就是被羊毛黨盯上了麽……至於麽你,一掛就是幾個小時,我反正在下午2點看看是上不去 ...
  • 1、我們可以寫一些簡單的檢查資料庫的檢查腳本 2、然後在命令行視窗執行我們腳本的時候,就會輸出我們預期的查詢結果 3、說明 1》prompt 顯示後面的提示,相當於一般的操作系統命令echo,輸出後面的信息Importing table t_test 2》set feedback off set f ...
  • 今天在工作中,在service中調用分頁查詢列表介面的時候,返回的到頁面的數據中總是存在缺失的數據,還有重覆的數據。後發現是order by導致的 ...
  • MySQL 1054錯誤的情況 在用命令插入數據時提示 1054錯誤的問題: 這種情況下的解決方法: 是因為引號的問題 ‘ ’ 。 字元串應該加上引號,解決問題。如下: ...
  • 1.Oracle中常見的數據類型分類:(A) 1.number(x,y) 數字類型,x表示最大長度,y表示精度對應java中除char外所有基本數據類型(byte、short、int、long、float、double、boolean) 2.varchar2(x) 可變字元串,x表示最大長度。對應j ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...