У меня есть один вопрос и больше попыток понять, чтобы я мог правильно его реализовать. Требование заключается в том, чтобы остановить ConcurrentMessageListenerContainer, вызвав метод stop для контейнера, который OOTB повторяет для KafkaMessageListenerContainer (на основе определенного параллелизма), и вызвать stop для каждого потока-потребителя.
Просто к вашему сведению, я на 1.3.5 и не могу перейти на 2. * из-за Spring Boot 1.5. *.
Конфигурация:
Допустим, у меня есть тема с 5 разделами и параллелизмом, определенным как 5. Использование Batchlistener, поэтому количество записей в пакете = 100 для каждого опроса.
Вопрос:
Когда мы вызываем stop для контейнера, он появляется внутренне, он устанавливает значение false для каждого KafkaMessageListenerContainer и вызывает wakeup для listenerConsumer.
setRunning(false);
this.listenerConsumer.consumer.wakeup();
Во время тестирования того, что я наблюдал, вызывая остановку на контейнере в отдельном потоке, он делает следующее:
1) Останавливает listenerConsumer, и это точно работает. После вызова стоп больше не происходит опрос.
2) Кажется, что любой listenerConsumer уже опросил 100 записей, и в середине обработки он завершает выполнение перед остановкой.
# 2 для дизайна, который вызывает остановку контейнера, только посылает пробуждение, чтобы остановить следующий опрос? Потому что я не вижу обработки ниже в KafkaMessageListenerContainer.run ()
catch (WakeupException e) {
//Ignore, we're stopping
}
Еще одна вещь, даже в весенней версии kafka 2.1 с наличием ContainerStoppingBatchErrorHandler, она вызывает ту же остановку контейнера, так что я полагаю, что в моем понимании, как справиться с этим сценарием ..
В заключение, если выше много подробностей, я хочу завершить listenerThread, если останов был вызван из отдельного потока. У меня есть коммит смещения вручную, так что воспроизведение партии в порядке.
Привет Гэри,
Поскольку вы предложили иметь прослушиватель потребительского программного обеспечения, мой вопрос относится к остановке прослушивателя через контейнер. Как только контейнер вызывает остановку, поток слушателя, будь то BatchListner, должен быть прерван из выполнения. Я знаю, что слушатель получил полный опрос записей, и вопрос не в том, чтобы потерять смещение, поскольку ack находится на уровне пакета.