Я написал простой клиент для чтения из темы Кафки, которая работала хорошо, пока я не понял, что customer.poll будет блокироваться и никогда не вернется, когда достигнет конца темы.
Ниже приведен пример моего кода
Observable
.repeatEval(consumer.poll(java.time.Duration.ofMillis(100)))
.timeoutOnSlowUpstreamOn(FiniteDuration(1000, MILLISECONDS), Observable.empty)
.filter(_ ne null)
Я ожидал, что в течение 1 секунды ничего не может быть использовано из темы, timeOnSlowUpStreamOn
преобразует исходный источник в пустой Observable
. Однако, поскольку Scheduler
блокируется на consumer.poll
, когда он достигает конца темы, timeout
никогда не происходит.
Глядя на дамп потока, программа застревает на:
at org.apache.kafka.common.network.Selector.select(Selector.java:689)
at org.apache.kafka.common.network.Selector.poll(Selector.java:409)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
Мне совершенно все равно, это по замыслу или что-то вроде "ошибки". Мне просто интересно, есть ли способ вернуть consumer.poll
(обычным способом или с помощью исключения): блокировка планировщика - это последнее, что я хочу видеть в своем коде.