Как управлять смещениями в KafkaItemReader, используемом в весеннем пакетном задании, в случае возникновения каких-либо исключений в середине чтения сообщений - PullRequest
0 голосов
/ 20 февраля 2020

Я работаю над приложением весенней загрузки на основе Kafka. Моим требованием было создать выходной файл со всеми записями с использованием Spring Batch. Я создал весеннее пакетное задание, которое интегрировано с настроенным классом, который расширяет KafkaItemReader. Сейчас я не хочу фиксировать смещения, поскольку мне может понадобиться go прочитать некоторые записи из уже использованных смещений. Мой потребительский конфиг имеет следующие свойства:

enable.auto.commit: ложный автоматический сброс смещения: последний group.id:

Есть два сценария ios -> 1. Счастливый путь, где я могу прочитать все сообщения из kafka topi c и преобразовать их, а затем записать их в выходной файл, используя указанную выше конфигурацию. 2. Я получаю исключение при чтении сообщений, и я не уверен, как управлять смещениями в таких случаях. Даже если я go вернусь к остатку смещения, как убедиться, что это правильное смещение для сообщений. Я не сохраняю полезную нагрузку записи сообщения нигде, кроме того, что она идет к выходному файлу весеннего пакета.

1 Ответ

0 голосов
/ 20 февраля 2020

Для этого необходимо использовать постоянный репозиторий заданий и настроить KafkaItemReader на для сохранения его состояния . Состояние состоит в смещении каждого раздела, назначенного считывателю, и будет сохранено на границах чанка (иначе для каждой транзакции).

В сценарии перезапуска считыватель будет инициализирован с последним смещением для каждого раздела из контекста выполнения и возобновить с того места, где он остановился.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...