問題描述: storm版本:1.2.2,kafka版本:2.11。 在使用storm去消費kafka中的數據時,發生瞭如下錯誤。 報錯圖示如下: 報錯的意思為:mybolt這個組件,在從kafka_sput組件上消費消息時,它所消費的default數據流是不存在的。 上面的報錯是因為代碼中有地方寫錯 ...
問題描述:
storm版本:1.2.2,kafka版本:2.11。
在使用storm去消費kafka中的數據時,發生瞭如下錯誤。
[root@node01 jars]# /opt/storm-1.2.2/bin/storm jar MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar com.suhaha.storm.storm122_kafka211_demo02.KafkaTopoDemo stormkafka SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/jars/MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Running: /usr/java/jdk1.8.0_181/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/storm-1.2.2 -Dstorm.log.dir=/opt/storm-1.2.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/storm-1.2.2/*:/opt/storm-1.2.2/lib/*:/opt/storm-1.2.2/extlib/*:MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar:/opt/storm-1.2.2/conf:/opt/storm-1.2.2/bin -Dstorm.jar=MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} com.suhaha.storm.storm122_kafka211_demo02.KafkaTopoDemo stormkafka SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/storm-1.2.2/lib/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/jars/MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 1329 [main] INFO o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'auto.offset.reset' to 'earliest' to ensure at-least-once processing 1338 [main] INFO o.a.s.k.s.KafkaSpoutConfig - Setting Kafka consumer property 'enable.auto.commit' to 'false', because the spout does not support auto-commit 【run on cluster】 1617 [main] WARN o.a.s.u.Utils - STORM-VERSION new 1.2.2 old null 1699 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -9173161025072727826:-6858502481790933429 1857 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds 1917 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : node01:6627 1947 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 1948 [main] WARN o.a.s.u.NimbusClient - Using deprecated config nimbus.host for backward compatibility. Please update your storm.yaml so it only has config nimbus.seeds 1950 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : node01:6627 1998 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - jars... 1998 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - artifacts... 1998 [main] INFO o.a.s.StormSubmitter - Dependency Blob keys - jars : [] / artifacts : [] 2021 [main] INFO o.a.s.StormSubmitter - Uploading topology jar MyProject-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar 3832 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /var/storm/nimbus/inbox/stormjar-ce16c5f2-db05-4d0c-8c55-01512ed64ee7.jar 3832 [main] INFO o.a.s.StormSubmitter - Submitting topology stormkafka in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-9173161025072727826:-6858502481790933429","topology.workers":1,"topology.debug":true} 3832 [main] WARN o.a.s.u.Utils - STORM-VERSION new 1.2.2 old 1.2.2 5588 [main] WARN o.a.s.StormSubmitter - Topology submission exception: Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout] Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]) at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273) at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387) at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159) at com.suhaha.storm.storm122_kafka211_demo02.KafkaTopoDemo.main(KafkaTopoDemo.java:47) Caused by: InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]) at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070) at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047) at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981) at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86) at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306) at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290) at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326) at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260) ... 3 more
報錯圖示如下:
報錯的意思為:mybolt這個組件,在從kafka_sput組件上消費消息時,它所消費的default數據流是不存在的。
上面的報錯是因為代碼中有地方寫錯了,下麵貼出代碼
1)KafkaTopoDemo類(main方法入口類和kafkaSpout設置)
1 package com.suhaha.storm.storm122_kafka211_demo02; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.storm.Config; 5 import org.apache.storm.LocalCluster; 6 import org.apache.storm.StormSubmitter; 7 import org.apache.storm.generated.AlreadyAliveException; 8 import org.apache.storm.generated.AuthorizationException; 9 import org.apache.storm.generated.InvalidTopologyException; 10 import org.apache.storm.kafka.spout.*; 11 import org.apache.storm.topology.TopologyBuilder; 12 import org.apache.storm.tuple.Fields; 13 import org.apache.storm.tuple.Values; 14 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; 15 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; 16 17 /** 18 * @author suhaha 19 * @create 2019-04-28 00:44 20 * @comment storm消費kafka數據 21 */ 22 23 public class KafkaTopoDemo { 24 public static void main(String[] args) { 25 final TopologyBuilder topologybuilder = new TopologyBuilder(); 26 //簡單的不可靠spout 27 // topologybuilder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("node01:9092,node02:9092,node03:9092", "topic01").build())); 28 29 //詳細的設置spout,寫一個方法生成KafkaSpoutConfig 30 topologybuilder.setSpout("kafka_spout", new KafkaSpout<String,String>(newKafkaSpoutConfig("topic01"))); 31 32 topologybuilder.setBolt("mybolt", new MyBolt("/tmp/storm_test.log")).shuffleGrouping("kafka_spout"); 33 34 //上面設置的是topology,現在設置storm配置 35 Config stormConf=new Config(); 36 stormConf.setNumWorkers(1); 37 stormConf.setDebug(true); 38 39 if (args != null && args.length > 0) {//集群提交 40 System.out.println("【run on cluster】"); 41 42 try { 43 StormSubmitter.submitTopology(args[0], stormConf, topologybuilder.createTopology()); 44 } catch (AlreadyAliveException e) { 45 e.printStackTrace(); 46 } catch (InvalidTopologyException e) { 47 e.printStackTrace(); 48 } catch (AuthorizationException e) { 49 e.printStackTrace(); 50 } 51 System.out.println("提交完成"); 52 53 } else {//本地提交 54 System.out.println("【run on local】"); 55 LocalCluster lc = new LocalCluster(); 56 lc.submitTopology("storm_kafka", stormConf, topologybuilder.createTopology()); 57 } 58 } 59 60 61 /** 62 * KafkaSpoutConfig設置 63 */ 64 private static KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String topic) { 65 ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>( 66 (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), 67 new Fields("topic", "partition", "offset", "key", "value"), "stream1"); 68 //bootstrapServer 以及topic 69 return KafkaSpoutConfig.builder("node01:9092,node02:9092,node03:9092", topic) 70 .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())//設置kafka使用者組屬性"group.id" 71 .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200) 72 .setRecordTranslator(trans)//修改spout如何將Kafka消費者message轉換為tuple,以及將該tuple發佈到哪個stream中 73 .setRetry(getRetryService())//重試策略 74 .setOffsetCommitPeriodMs(10_000) 75 .setFirstPollOffsetStrategy(EARLIEST)//允許你設置從哪裡開始消費數據 76 .setMaxUncommittedOffsets(250) 77 .build(); 78 } 79 80 /** 81 * 重試策略設置 82 */ 83 protected static KafkaSpoutRetryService getRetryService() { 84 return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), 85 TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); 86 } 87 }
2)bolt類(跟問題沒啥關係)
1 package com.suhaha.storm.storm122_kafka211_demo02; 2 3 import org.apache.storm.task.OutputCollector; 4 import org.apache.storm.task.TopologyContext; 5 import org.apache.storm.topology.IRichBolt; 6 import org.apache.storm.topology.OutputFieldsDeclarer; 7 import org.apache.storm.tuple.Tuple; 8 import java.io.FileWriter; 9 import java.io.IOException; 10 import java.util.Map; 11 12 /** 13 * @author suhaha 14 * @create 2019-04-28 01:05 15 * @comment 該bolt中的處理邏輯非常簡單,只是簡單的從input中將各類數據取出來,然後簡單的列印出來 16 * 並且將數據列印到path指定的文件中(這裡需要註意的是,最終寫出的文件是在執行該bolt task的worker上的, 17 * 而不在nimbus伺服器路徑下,也不一定在提交storm job的伺服器上) 18 */ 19 20 public class MyBolt implements IRichBolt { 21 private FileWriter fileWriter = null; 22 String path = null; 23 24 @Override 25 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 26 try { 27 fileWriter = new FileWriter(path); 28 } catch (IOException e) { 29 e.printStackTrace(); 30 } 31 } 32 33 /** 34 * 構造方法 35 * @param path 36 */ 37 public MyBolt(String path) { 38 this.path = path; 39 } 40 41 42 @Override 43 public void execute(Tuple input) { 44 System.out.println(input); 45 try { 46 /** 47 * 從input中獲取相應數據 48 */ 49 System.out.println("========================="); 50 String topic = input.getString(0); 51 System.out.println("index 0 --> " + topic); //topic 52 System.out.println("topic --> " + input.getStringByField("topic")); 53 54 System.out.println("-------------------------"); 55 System.out.println("index 1 --> " + input.getInteger(1)); //partition 56 Integer partition = input.getIntegerByField("partition"); 57 System.out.println("partition-> " + partition); 58 59 System.out.println("-------------------------"); 60 Long offset = input.getLong(2); 61 System.out.println("index 2 --> " + offset); //offset 62 System.out.println("offset----> " +input.getLongByField("offset")); 63 64 System.out.println("-------------------------"); 65 String key = input.getString(3); 66 System.out.println("index 3 --> " + key); //key 67 System.out.println("key-------> " + input.getStringByField("key")); 68 69 System.out.println("-------------------------"); 70 String value = input.getString(4); 71 System.out.println("index 4 --> " + value); //value 72 System.out.println("value--> " + input.getStringByField("value")); 73 74 String info = "topic: " + topic + ", partiton: " +partition + ", offset: " + offset + ", key: " + key +", value: " + value + "\n"; 75 System.out.println("info = " + info); 76 fileWriter.write(info); 77 fileWriter.flush(); 78 } catch (Exception e) { 79 e.printStackTrace(); 80 } 81 } 82 83 @Override 84 public void cleanup() { 85 // TODO Auto-generated method stub 86 } 87 88 @Override 89 public void declareOutputFields(OutputFieldsDeclarer declarer) { 90 // TODO Auto-generated method stub 91 } 92 93 @Override 94 public Map<String, Object> getComponentConfiguration() { 95 // TODO Auto-generated method stub 96 return null; 97 } 98 }
錯誤出現在KafkaTopoDemo類中,已在上面的代碼中做了黃色高亮標註。
錯誤的原因在於,在代碼中對RecordTranslator進行設置時(第67行),將數據流Id設置成了stream1;而在對topologyBuilder設置bolt時(第32行),使用的分發策略是shuffleGrouping("kafka_spout"),其實錯誤跟分發策略沒關係,但是跟分發策略的使用方式有關係——當使用shuffleGrouping(String componentId)這種方式設置分發策略時,mybolt組件預設是從上游組件的default 這個數據流中獲取數據,而在代碼中,我已將上游(kafka_spout)的數據流id設置成了stream1,故而導致了報錯(InvalidTopologyException(msg:Component: [mybolt] subscribes from non-existent stream: [default] of component [kafka_spout]),說default數據流不存在)。
因此,需要對代碼做了相應修改,即:在設置mybolt組件的分發策略時,使用shuffleGrouping(String componentId, String streamId),手動指定要讀取的數據流id為stream1,如此,程式就不會報該錯誤了。
topologybuilder.setBolt("mybolt", new MyBolt("/tmp/storm_test.log")).shuffleGrouping("kafka_spout", "stream1");