Как выполнить ручное подтверждение смещения из Spring ListenerContainerIdleEvent - PullRequest
3 голосов
/ 10 июля 2019

У меня есть прослушиватель Kafka, который реализует интерфейс прослушивателя подтверждающего сообщения со следующими свойствами:

  1. ackMode - MANUAL_IMMEDIATE
  2. idleEventInterval - 3 мин.

При получении сообщения от прослушивателя он решает, следует ли подтверждать конкретную запись с помощью acceptment.acknowledge (), и он работает как положено.

Кроме того, у меня есть сценарий для подтверждения последнего номера смещения (сохранение его в памяти) через X минут (также, если сообщения не поступили).Чтобы преодолеть это требование, я решил использовать ListenerContainerIdleEvent, который запускается каждые 3 минуты в соответствии с моей конфигурацией.

Мои вопросы:

  1. Есть ли способ подтвердить смещение Кафки как триггер для события простоя?Событие idle содержит ссылку на KafkaMessageListenerContainer, но оно инкапсулирует ListenerConsumer, который содержит KafkaConsumer.

  2. синхронизируется ли отправка события ожидания сообщения (с тем же потоком KafkaListenerConsumer)?Исходя из кода, реализацией по умолчанию является SimpleApplicationEventMulticaster, который инициализируется без TaskExecutor, поэтому он вызывает прослушиватель в том же потоке.Вы можете это одобрить?

Я использую spring-kafka 1.3.9.

1 Ответ

0 голосов
/ 10 июля 2019
  1. Да, просто сохраните ссылку на последний Acknowledgment и снова наберите acknowledge().

  2. Да, событие публикуется в потоке потребителя по умолчанию.

Даже если событие опубликовано в другом потоке (исполнитель в мультикастере), оно все равно должно работать, потому что вместо прямой фиксации фиксация будет помещаться в очередь и обрабатываться потребителем при выходе из опроса.

См. Логику в processAck().

В более новых версиях (начиная с 2.0) событие имеет ссылку на потребителя, поэтому вы можете напрямую взаимодействовать с ним (получить текущую позицию и зафиксировать ее снова), если событие публикуется в потоке потребителя.

...