KafkaConsumer переходит в неопределенное состояние ожидания при опросе - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь опросить тему kafka с помощью API KafkaConsumer. Но он переходит в состояние ожидания на неопределенный срок и не выполняется, даже если мы пропустили тайм-аут для опроса.

Из дампа потока он показывает состояние Runnable, я взял несколько дампов всегда, основной поток остается в той же позиции, я думаю, что он не выходит из ожидания.

"main" #1 prio=5 os_prio=0 tid=0x00007f42a800f000 nid=0x59 runnable [0x00007f42b0782000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000006c02e2088> (a sun.nio.ch.Util$2)
        - locked <0x00000006c02e2078> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000006c02e1f60> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
        at org.test.TestReceiver(TestReceiver:100)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Вот фрагмент кода .. только первая распечатка журнала.

LOG.info("Going to wait {}ms for ConsumerRecords", POLLING_TIMEOUT_MILLIS);
ConsumerRecords<String, String> records = consumer.poll(POLLING_TIMEOUT_MILLIS); 
LOG.info("Received {} ConsumerRecords to process.", (records != null ? records.count() : null));

Библиотечные версии ...
kafka_2.11: баночка: 0.9.0.0
Кафка-клиенты: баночка: 0.9.0.0

Ответы [ 2 ]

0 голосов
/ 29 мая 2018

Замечена ниже ошибка в кафке, после настройки offsets.topic.replication.factor = 1 в server.properties проблема решена.

ERROR [KafkaApi-0] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
0 голосов
/ 17 мая 2018

KafkaConsumer#poll() может блокироваться, если ему необходимо обновить свои метаданные, но не может подключиться к кластеру.

Адресовано через KIP-266: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886

...