bin/kafka-streams-application-reset.sh
находится в каталоге установки kafka. а тебе это действительно нужно? для «сброса» можно просто обновить свойство application.id
до любого нового значения конфигурации потоков kafka. Тем не менее, если вы используете автоматический сброс смещения как самый ранний, инструмент сброса не удалит уже существующие недействительные записи. вам может понадобиться auto.offset.reset: latest
, но это зависит от вашего варианта использования.
в случае, если вы хотите просто пропустить и зарегистрировать недействительные входящие сообщения, вы можете использовать свойство kafka streams
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler