Остановка ConcurrentMessageListenerContainer в 1.3.5 - PullRequest
0 голосов
/ 01 июля 2018

У меня есть один вопрос и больше попыток понять, чтобы я мог правильно его реализовать. Требование заключается в том, чтобы остановить ConcurrentMessageListenerContainer, вызвав метод stop для контейнера, который OOTB повторяет для KafkaMessageListenerContainer (на основе определенного параллелизма), и вызвать stop для каждого потока-потребителя.

Просто к вашему сведению, я на 1.3.5 и не могу перейти на 2. * из-за Spring Boot 1.5. *.

Конфигурация: Допустим, у меня есть тема с 5 разделами и параллелизмом, определенным как 5. Использование Batchlistener, поэтому количество записей в пакете = 100 для каждого опроса.

Вопрос: Когда мы вызываем stop для контейнера, он появляется внутренне, он устанавливает значение false для каждого KafkaMessageListenerContainer и вызывает wakeup для listenerConsumer.

setRunning(false);
this.listenerConsumer.consumer.wakeup();

Во время тестирования того, что я наблюдал, вызывая остановку на контейнере в отдельном потоке, он делает следующее:

1) Останавливает listenerConsumer, и это точно работает. После вызова стоп больше не происходит опрос. 2) Кажется, что любой listenerConsumer уже опросил 100 записей, и в середине обработки он завершает выполнение перед остановкой. # 2 для дизайна, который вызывает остановку контейнера, только посылает пробуждение, чтобы остановить следующий опрос? Потому что я не вижу обработки ниже в KafkaMessageListenerContainer.run ()

catch (WakeupException e) {
     //Ignore, we're stopping
}

Еще одна вещь, даже в весенней версии kafka 2.1 с наличием ContainerStoppingBatchErrorHandler, она вызывает ту же остановку контейнера, так что я полагаю, что в моем понимании, как справиться с этим сценарием ..

В заключение, если выше много подробностей, я хочу завершить listenerThread, если останов был вызван из отдельного потока. У меня есть коммит смещения вручную, так что воспроизведение партии в порядке.

Привет Гэри,

Поскольку вы предложили иметь прослушиватель потребительского программного обеспечения, мой вопрос относится к остановке прослушивателя через контейнер. Как только контейнер вызывает остановку, поток слушателя, будь то BatchListner, должен быть прерван из выполнения. Я знаю, что слушатель получил полный опрос записей, и вопрос не в том, чтобы потерять смещение, поскольку ack находится на уровне пакета.

1 Ответ

0 голосов
/ 01 июля 2018

Непонятно, в чем ваша проблема; контейнер останавливается, когда слушатель возвращается после обработки пакета.

Если вы хотите управлять смещениями на уровне записи, не используйте BatchMessageListener; используйте вместо этого уровень записи MessageListener.

В первом случае вся партия записей, возвращаемая опросом, считается единицей работы; смещения для всего пакета фиксируются или нет.

Даже с прослушивателем уровня записи контейнер не остановится, пока текущий пакет записей не будет отправлен слушателю; в этом случае, что вы должны сделать, зависит от режима подтверждения; при ручном подтверждении просто игнорируйте те записи, которые получены после остановки. Если контейнер управляет квитанциями; генерировать исключение для записей, которые вы хотите удалить (с ackOnError=false - по умолчанию true).

К сожалению, вы не можете перейти к более позднему выпуску.

С 2,1; там гораздо больше гибкости. Например; SeekToCurrentErrorHandler и SeekToCurrentBatchErrorHandler предоставляются.

SeekToCurrentErrorHandler расширяет RemainingRecordsErrorHandler, что означает, что оставшиеся записи в опросе отправляются обработчику ошибок вместо слушателя; когда контейнер обнаруживает, что RemainingRecordsErrorHandler, слушатель не получит оставшиеся записи в пакете, и обработчик может решить, что делать - остановить контейнер, выполнить поиск и т. д.

Таким образом, вам больше не нужно останавливать контейнер при получении плохой записи.

Все это предназначено для обработки ошибок / остановки контейнера из-за некоторых ошибок данных.

В настоящее время нет опции stopNow() для контейнеров - немедленно прекратите вызывать прослушиватель после обработки текущей записи и отбросьте все оставшиеся записи, чтобы они были повторно отправлены при запуске. В настоящее время вы должны отказаться от них самостоятельно, как я описал выше.

Мы могли бы рассмотреть возможность добавления такой опции, но это было бы как минимум 2.2.

Мы предоставляем исправления ошибок только в 1.3.x, и только в том случае, если нет обходного пути.

Это открытый исходный код, так что вы всегда можете раскошелиться на него и внести свой вклад в каркас.

...