Ошибка носика от интеграции Apache Storm Trident и Kafka - PullRequest
0 голосов
/ 24 мая 2018

Я использую OpaqueTridentKafkaSpout для приема сообщений от Kafka.Ниже приведен код.Я проигнорировал конфигурацию max spout pending, так как это приводит к тому, что одно и то же сообщение kafka поступает несколькими партиями.

TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);

Я получаю следующую ошибку один раз, когда запускается Kafka Spout, но после этого он работает плавно.

2018-05-29 09: 47: 21.703 oasutil Thread-9-spout-myspout-Spout-executor [33 33] [ОШИБКА] Асинхронный цикл прерван!java.lang.RuntimeException: java.lang.NullPointerException в org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor (DisruptorQueue.java:522) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable (DisruptorQueue.java:487) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.disruptor $ потребление_batch_when_available.invoke (disruptor.clj: 74)~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.daemon.executor $ fn__5043 $ fn__5056 $ fn__5109.invoke (executor.clj: 861) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.util $ async_loop $ fn__557.invoke (util.clj: 484) [storm-core-1.2.1.jar: 1.2.1] в clojure.lang.AFn.выполнить (AFn.java:22) [clojure-1.7.0.jar :?] в java.lang.Thread.run (Thread.java:748) [?: 1.8.0_171] Вызвано: java.lang.NullPointerException вorg.apache.storm.poutEmitter.java:127) ~ [stormjar.jar :?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch (KafkaTridentSpoutEmitter.java:51) ~ [stormjar.jar :?] илиstorm.trident.spout.OpaquePartitionedTridentSpoutExecutor $ Emitter.emitBatch (OpaquePartitionedTridentSpoutExecutor.java:141) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.trident.spout.TridentSpout.TridentSpoutout.java: 82) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.trident.topology.TridentBoltExecutor.execute (TridentBoltExecutor.java:383) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.daemon.executor $ fn__5043 $ tuple_action_fn__5045.invoke (executor.clj: 739) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.daemon.executor $ mk_task_receiver $ fn__4964.invoke (executor.clj: 468) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.disruptor $ clojure_handler $ reify__4475.onEvent(disruptor.clj: 41) ~ [storm-core-1.2.1.jar: 1.2.1] в org.apache.storm.utils.DisruptorQueue.consumeBatchToКурсор (DisruptorQueue.java:509) ~ [storm-core-1.2.1.jar: 1.2.1] ... еще 6

Есть предложения по этому поводу?

1 Ответ

0 голосов
/ 29 мая 2018

Ваша трассировка стека показывает, что вы попали https://issues.apache.org/jira/browse/STORM-3046.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...