Держите Реактора Кафки Потребителей Живыми - PullRequest
0 голосов
/ 20 января 2020

Мы используем Reactor Kafka для наших потребителей Kafka. У нас есть 5 разделов для конкретной топи c. Таким образом, мы создаем 5 потребителей для каждого раздела соответственно.

Теперь у нас есть требование, когда в момент, когда потребитель приходит, он обрабатывает количество сообщений, присутствующих в очереди, и затем go отключается. Потребитель просыпается через несколько часов и начинает обрабатывать сообщения из того же раздела.

Мы добились этого путем установки для MAX_POLL_INTERVAL_MS_CONFIG значений 36_00_000 и CONNECTIONS_MAX_IDLE_MS_CONFIG до 36_00_000, что будет больше, чем время ожидания. Потребитель потребляет все сообщения в очереди, а затем вызов Thread.sleep для потребителя переводит его в спящий режим.

Теперь проблема, с которой мы сталкиваемся, заключается в том, что иногда у нас появляются живые потребители еще не потребляет, даже если они вне потока. Потребители также не отписывались от разделов, иначе я знал бы из журналов: -

var receiverOptions = ReceiverOptions.<String, String>create(getConsumerProperties()) .addAssignListener(partitions -> log.info("Partitions Assigned "+ partitions.toString())) .addRevokeListener(partitions -> log.info("Partitions Revoked "+ partitions.toString())) .subscription(Collections.singleton(appConfig.getInitialScanTopicName()));

Есть ли способ отладки, почему потребители go зомби ie? Также где-то я читаю pause () и continue (), возможно ли их использовать с реакторной кафкой?

...