Я пытаюсь опросить тему kafka с помощью API KafkaConsumer. Но он переходит в состояние ожидания на неопределенный срок и не выполняется, даже если мы пропустили тайм-аут для опроса.
Из дампа потока он показывает состояние Runnable, я взял несколько дампов всегда, основной поток остается в той же позиции, я думаю, что он не выходит из ожидания.
"main" #1 prio=5 os_prio=0 tid=0x00007f42a800f000 nid=0x59 runnable [0x00007f42b0782000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006c02e2088> (a sun.nio.ch.Util$2)
- locked <0x00000006c02e2078> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006c02e1f60> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
at org.test.TestReceiver(TestReceiver:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Вот фрагмент кода .. только первая распечатка журнала.
LOG.info("Going to wait {}ms for ConsumerRecords", POLLING_TIMEOUT_MILLIS);
ConsumerRecords<String, String> records = consumer.poll(POLLING_TIMEOUT_MILLIS);
LOG.info("Received {} ConsumerRecords to process.", (records != null ? records.count() : null));
Библиотечные версии ...
kafka_2.11: баночка: 0.9.0.0
Кафка-клиенты: баночка: 0.9.0.0