java.nio.channels.ClosedChannelException при использовании сообщения из ливня - PullRequest
0 голосов
/ 16 мая 2018

Я написал топологию шторма, при которой данные, извлекаемые из kafka через носик kafka, работают нормально в моей локальной среде, но в кластере

Я получаю следующую ошибку:

2018-05-16 18: 25: 59.358 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor [20 20] [INFO] Задача [1/1] Обновление соединений диспетчера разделов 2018-05-16 18: 25: 59.359 oaskDynamicBrokersReader Thread-25-kafkaSpout-executor [20 20] [INFO] Чтение информации о разделе из zookeeper: GlobalPartitionInformation {topic = data-ops, partitionMap = {0 = uat-datalake-node2 .org: 6667}} 2018-05-16 18: 25: 59.359 oaskKafkaUtils Thread-25-kafkaSpout-executor [20 20] [INFO] Задание [1/1] назначено [Partition {host = uat-datalake-node2.org: 6667, topic = data-ops, partition = 0}] 2018-05-16 18: 25: 59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor [20 20] [INFO] Задача [1/1] Удаленные менеджеры разделов: [] 2018-05-16 18: 25: 59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor [20 20] [INFO] Задача [1/1] Новые менеджеры разделов: [] 2018-05-16 18: 25: 59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor [20 20] [INFO] Задание [1/1] Завершено обновление 2018-05-16 18: 25: 59.361 k.c.SimpleConsumer Thread-25-kafkaSpout-executor [20 20] [INFO] Повторное подключение из-за ошибки: java.nio.channels.ClosedChannelException в kafka.network.BlockingChannel.send (BlockingChannel.scala: 110) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer.liftedTree1 $ 1 (SimpleConsumer.scala: 85) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer.kafka $ consumer $ SimpleConsumer $$ sendRequest (SimpleConsumer.scala: 83) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (SimpleConsumer.scala: 132) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (SimpleConsumer.scala: 132) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (SimpleConsumer.scala: 132) [kafka_2.10-0.10.2.1.jar :?] at kafka.metrics.KafkaTimer.time (KafkaTimer.scala: 33) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply $ mcV $ sp (SimpleConsumer.scala: 131) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply (SimpleConsumer.scala: 131) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply (SimpleConsumer.scala: 131) [kafka_2.10-0.10.2.1.jar :?] at kafka.metrics.KafkaTimer.time (KafkaTimer.scala: 33) [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer.fetch (SimpleConsumer.scala: 130) [kafka_2.10-0.10.2.1.jar :?] в kafka.javaapi.consumer.SimpleConsumer.fetch (SimpleConsumer.scala: 47) [kafka_2.10-0.10.2.1.jar :?] в org.apache.storm.kafka.KafkaUtils.fetchMessages (KafkaUtils.java:191) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.PartitionManager.fill (PartitionManager.java:189) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.PartitionManager.next (PartitionManager.java:138) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.KafkaSpout.nextTuple (KafkaSpout.java:135) [storm-kafka-1.0.1.jar: 1.0.1] at org.apache.storm.daemon.executor $ fn__6505 $ fn__6520 $ fn__6551.invoke (executor.clj: 651) [storm-core-1.0.1.2.5.3.0-37.jar: 1.0.1.2.5.3.0- 37] в org.apache.storm.util $ async_loop $ fn__554.invoke (util.clj: 484) [storm-core-1.0.1.2.5.3.0-37.jar: 1.0.1.2.5.3.0-37] at clojure.lang.AFn.run (AFn.java:22) [clojure-1.7.0.jar :?] на java.lang.Thread.run (Thread.java:748) [?: 1.8.0_144] 2018-05-16 18: 26: 09.372 o.a.s.k.KafkaUtils Thread-25-kafkaSpout- executor [20 20] [WARN] Сетевая ошибка при получении сообщений: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor $ SocketInputStream.read (SocketAdaptor.java:211) ~ [?: 1.8.0_144]at sun.nio.ch.ChannelInputStream.read (ChannelInputStream.java:103) ~ [?: 1.8.0_144] at java.nio.channels.Channels $ ReadableByteChannelImpl.read (Channels.java:385) ~ [?: 1.8.0_144] в org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:81) ~ [kafka-clients-0.10.2.1.jar :?] в kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala:129) ~ [kafka_2.10-0.10.2.1.jar :?] в kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120) ~ [kafka_2.10-0.10.2.1.jar :?] в kafka.consumer.SimpleConsumer.liftedTree1 $ 1 (SimpleConsumer.scala: 99) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer.kafka $ customer $ SimpleConsumer $$ sendRequest (SimpleConsumer.scala: 83) ~ [kafka_.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (SimpleConsumer.scala: 132) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (SimpleConsumer.scala: 132) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (SimpleConsumer.scala: 132) ~ [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time (KafkaTimer.scala: 33) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer $$ anonfun $ получить $ 1.apply $ mcV $ sp(SimpleConsumer.scala: 131) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply (SimpleConsumer.scala: 131) ~ [kafka_2.10-0.10.2.1.jar :?] в kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply (SimpleConsumer.scala: 131) ~ [kafka_2.10-0.10.2.1.jar :?] в kafka.metrics.KafkaTimer.time (KafkaTimer.scala: 33) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer.fetch (SimpleConsumer.scala: 130) ~ [kafka_2.10-0.10.2.1.jar :?] вkafka.javaapi.consumer.SimpleConsumer.fetch (SimpleConsumer.scala: 47) ~ [kafka_2.10-0.10.2.1.jar :?] at org.apache.storm.kafka.KafkaUtils.fetchMessages (KafkaUtils.java:191) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.PartitionManager.fill (PartitionManager.java:189) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.PartitionManager.next (PartitionManager.java:138) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.KafkaSpout.nextTuple (KafkaSpout.java:135) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.daemon.executor $fn__6505 $ fn__6520 $ fn__6551.invoke (executor.clj: 651) [storm-core-1.0.1.2.5.3.0-37.jar: 1.0.1.2.5.3.0-37] в org.apache.storm.util $async_loop $ fn__554.invoke (util.clj: 484) [storm-core-1.0.1.2.5.3.0-37.jar: 1.0.1.2.5.3.0-37] в clojure.lang.AFn.run (AFn.java: 22) [clojure-1.7.0.jar :?] at java.lang.Thread.run (Thread.java:748) [?: 1.8.0_144] 2018-05-16 18: 26: 09.373 oaskKafkaSpout Thread-25-kafkaSpout-executor [20 20] [WARN] Ошибка при получении org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException в org.apache.storm.kafka.KafkaUtils.fetchMessages (KafkaUtils.java:1)storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.PartitionManager.fill (PartitionManager.java:189) ~ [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.PartitionManager.next (PartitionManager.java:138) ~ [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.kafka.KafkaSpout.nextTuple (KafkaSpout.java:135) [storm-kafka-1.0.1.jar: 1.0.1] в org.apache.storm.daemon.executor $ fn__6505 $ fn__6520 $ fn__6551.invoke (executor.clj: 651)[storm-core-1.0.1.2.5.3.0-37.jar: 1.0.1.2.5.3.0-37] в org.apache.storm.util $ async_loop $ fn__554.invoke (util.clj: 484) [storm-core-1.0.1.2.5.3.0-37.jar: 1.0.1.2.5.3.0-37] at clojure.lang.AFn.run (AFn.java:22) [clojure-1.7.0.jar :?] at java.lang.Thread.run (Thread.java:748) [?: 1.8.0_144] Вызывается: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor $ SocketInputStream.read (SocketAdaptor.java:211)~ [: 1.8.0_144]at sun.nio.ch.ChannelInputStream.read (ChannelInputStream.java:103) ~ [?: 1.8.0_144] в java.nio.channels.Channels $ ReadableByteChannelImpl.read (Channels.java:385) ~ [?: 1.8.0_144] в org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:81) ~ [kafka-clients-0.10.2.1.jar :?] в kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129) ~ [kafka_2.10-0.10.2.1.jar :?] в kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer.liftedTree1 $ 1 (SimpleConsumer.scala: 99) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer.kafka $ consumer $ SimpleConsumer $$ sendRequest (SimpleConsumer.scala: 83) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (SimpleConsumer.scala: 132) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (SimpleConsumer.scala: 132) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (SimpleConsumer.scala: 132) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.metrics.KafkaTimer.time (KafkaTimer.scala: 33) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply $ mcV $ sp (SimpleConsumer.scala: 131) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply (SimpleConsumer.scala: 131) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer $$ anonfun $ fetch $ 1.apply (SimpleConsumer.scala: 131) ~ [kafka_2.10-0.10.2.1.jar :?] at kafka.metrics.KafkaTimer.time (KafkaTimer.scala: 33) ~ [kafka_2.10-0.10.2.1.jar :?] на kafka.consumer.SimpleConsumer.fetch (SimpleConsumer.scala: 130) ~ [kafka_2.10-0.10.2.1.jar :?] в kafka.javaapi.consumer.SimpleConsumer.fetch (SimpleConsumer.scala: 47) ~ [kafka_2.10-0.10.2.1.jar :?] в org.apache.storm.kafka.KafkaUtils.fetchMessages (KafkaUtils.java:191) ~ [storm-kafka-1.0.1.jar: 1.0.1] ... еще 7

1 Ответ

0 голосов
/ 16 мая 2018

Похоже, у вас есть тайм-аут, когда работник Storm попытался прочитать у брокера Kafka.Может быть, связь между ними ненадежная или медленная?

Тем не менее трассировка стека, кажется, говорит о том, что потребитель переподключился, поэтому, если это случается редко, возможно, у вас просто произошел сбой в связи между работникоми Kafka.

Если это происходит регулярно и вы уверены, что соединение стабильно, я постараюсь задать вопрос в списке рассылки Kafka по адресу https://kafka.apache.org/contact. Если вы опубликуете свою проблему и какую версию Kafkaвы используете, они могут сказать вам, есть ли проблемы, которые могут привести к тайм-ауту сокета для потребителя.

...