Зафиксировать кафки вручную во Flink - PullRequest
0 голосов
/ 08 февраля 2019

В потоковом приложении Flink, которое принимает сообщения от Kafka, 1) Как отключить автоматическую фиксацию?2) Как я могу подтвердить коммит от Flink после успешной обработки сообщения?

Спасибо.

1 Ответ

0 голосов
/ 08 февраля 2019

По по умолчанию Flink фиксирует смещения на контрольных точках.Вы можете отключить его следующим образом:

val consumer = new FlinkKafkaConsumer011[T](...)
c.setCommitOffsetsOnCheckpoints(false)

Если у вас не включены контрольные точки, см. здесь

Почему бы вы это сделали?Механизм контрольных точек Flink поможет вам решить эту проблему.Flink не будет фиксировать смещения при наличии сбоев.Если вы выбросите исключение в какой-то момент после потребителя Kafka, Flink попытается перезапустить поток с предыдущей успешной контрольной точки.Если ошибка не исчезнет, ​​Flink будет несколько раз перезагружаться в течение заданного количества раз, прежде чем произойдет сбой потока.Это означает, что маловероятно, что вы потеряете сообщения из-за того, что Flink отправляет сообщения, которые ваш код не обработал успешно.

...