Как восстановить сообщение в нифи о кафке? - PullRequest
0 голосов
/ 28 апреля 2020

Сценарий:

Поскольку ConsumeKafka_2_0 потребляет сообщения в Кафке, смещение в Кафке становится равным 100. Когда Нифи нарушается, заставьте Нифи восстановить прежнее состояние со смещением в Кафаке 50. Однако на самом деле смещение в Kafka равно "100", потому что Kafka не восстанавливает.

Когда Nifi перезапускается, я предполагаю, что он использует сообщения со смещением "100", а не со смещением "50", потому что смещение равно "100 "в Кафке в то время. Так что некоторые сообщения о смещении" 50 "-" 100 "будет потеряно? Как их восстановить?

1 Ответ

0 голосов
/ 28 апреля 2020

Кафка отслеживает смещение на основе идентификатора группы потребителей (так называемый идентификатор группы в процессоре NiFi). Поэтому, если вы остановите и запустите процессор без изменения идентификатора группы, он всегда будет начинаться со следующего смещения для этого идентификатора группы.

В настоящее время NiFi не имеет способа запуска с указанным смещением c, но если вы измените идентификатор группы и выберете «Смещение сброса» как «самое раннее», то оно начнет потреблять с начала со смещением 0.

Как только данные будут получены из kafka и записаны в репозитории NiFi, смещение затем совершено. Таким образом, при перезапуске NiFi не требуется повторное рассмотрение этих сообщений, они уже хранятся локально в NiFi, и NiFi продолжит обрабатывать их с того места, где они были в потоке, когда вы перезапустили NiFi.

...