kafka `consumer.poll` застревает при достижении конца темы - PullRequest
0 голосов
/ 19 июня 2019

Я написал простой клиент для чтения из темы Кафки, которая работала хорошо, пока я не понял, что customer.poll будет блокироваться и никогда не вернется, когда достигнет конца темы.

Ниже приведен пример моего кода

    Observable
    .repeatEval(consumer.poll(java.time.Duration.ofMillis(100)))
    .timeoutOnSlowUpstreamOn(FiniteDuration(1000, MILLISECONDS), Observable.empty)
    .filter(_ ne null)

Я ожидал, что в течение 1 секунды ничего не может быть использовано из темы, timeOnSlowUpStreamOn преобразует исходный источник в пустой Observable. Однако, поскольку Scheduler блокируется на consumer.poll, когда он достигает конца темы, timeout никогда не происходит.

Глядя на дамп потока, программа застревает на:

at org.apache.kafka.common.network.Selector.select(Selector.java:689)
at org.apache.kafka.common.network.Selector.poll(Selector.java:409)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)

Мне совершенно все равно, это по замыслу или что-то вроде "ошибки". Мне просто интересно, есть ли способ вернуть consumer.poll (обычным способом или с помощью исключения): блокировка планировщика - это последнее, что я хочу видеть в своем коде.

1 Ответ

0 голосов
/ 20 июня 2019

Я не понял, KafkaConsumer не является потокобезопасным. Более чем один поток, вызывающий свои методы одновременно, выдаст ConcurrentModificationException. После некоторой правильной синхронизации проблема решается.

...