как использовать kafka-streams-application-reset - PullRequest
0 голосов
/ 13 января 2019

Я сейчас пробую потоки кафки. В процессе создания моего приложения я отправил несколько недопустимых событий, которые теперь вызывают исключения при десериализации.

Я бы хотел использовать kafka-streams-application-reset, как предложено здесь , но я не уверен, где мне взять фактический сброс приложения kafka-streams, который должен быть нашли?

1 Ответ

0 голосов
/ 13 января 2019

bin/kafka-streams-application-reset.sh находится в каталоге установки kafka. а тебе это действительно нужно? для «сброса» можно просто обновить свойство application.id до любого нового значения конфигурации потоков kafka. Тем не менее, если вы используете автоматический сброс смещения как самый ранний, инструмент сброса не удалит уже существующие недействительные записи. вам может понадобиться auto.offset.reset: latest, но это зависит от вашего варианта использования.

в случае, если вы хотите просто пропустить и зарегистрировать недействительные входящие сообщения, вы можете использовать свойство kafka streams default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

...