По по умолчанию Flink фиксирует смещения на контрольных точках.Вы можете отключить его следующим образом:
val consumer = new FlinkKafkaConsumer011[T](...)
c.setCommitOffsetsOnCheckpoints(false)
Если у вас не включены контрольные точки, см. здесь
Почему бы вы это сделали?Механизм контрольных точек Flink поможет вам решить эту проблему.Flink не будет фиксировать смещения при наличии сбоев.Если вы выбросите исключение в какой-то момент после потребителя Kafka, Flink попытается перезапустить поток с предыдущей успешной контрольной точки.Если ошибка не исчезнет, Flink будет несколько раз перезагружаться в течение заданного количества раз, прежде чем произойдет сбой потока.Это означает, что маловероятно, что вы потеряете сообщения из-за того, что Flink отправляет сообщения, которые ваш код не обработал успешно.