Похоже, что архитектура потока данных не идемпотентна. Данные никогда не выходят из строя или дублируются Kafka, возникнет проблема с производителем. Kafka автоматически удалял данные из тем после периода хранения, поэтому просто подождите до тех пор, пока этот период не будет очищен, если вы беспокоитесь только о существующих данных. После того как Kafka удалит данные, любой потребитель, отстающий в чтениях (то есть хочет прочитать из удаленного смещения), должен будет установить auto.offset.reset
из earliest
или latest
, в противном случае потребитель выдаст ошибку OffsetOutOfRange
.
Между тем, если вы можете пропустить записи и начать опрос для определенного смещения / раздела, используя consumer.seek(partition, offset)
Решение будет зависеть от вашей бизнес-логики и шаблона входящих данных, но вам будет лучше, если вы решите проблемы производителя, а не разберетесь с потребителем.