В настоящее время я использую AcknoledgingMessageListener для реализации потребителя Kafka с использованием spring-Kafka.Эта реализация помогает мне прослушивать определенную тему и обрабатывать сообщения вручную.Теперь мне нужно создать следующую возможность: допустим, что для некоторого исключения из среды или некоторого ввода неверных данных через эту тему мне нужно воспроизвести данные по теме с определенным смещением.Это будет ручной триггер (в основном через выполнение класса Java).
Было бы идеально, если бы я мог получать сообщения между этими смещениями и передавать их в тему воспроизведения, чтобы новый потребитель мог обрабатывать эти сообщения, таким образом сохраняя смещения в исходной теме.
- Интерфейс CosumerSeekAware - если это ответ, как я могу вызвать это внешне?Через, скажем, Mvn -Dexec.Я не уверен, что это вообще возможно
- Также позвольте мне сказать, что у меня есть отметка времени сбоя, можно ли проанализировать тему, чтобы найти смещение, соответствующее сбоям, чтобы я мог воспроизвести егосмещение?
- Могу ли я найти смещения, соответствующие некоторым конкретным данным, чтобы я мог воспроизвести эти конкретные смещения?
Все эти требования направлены на создание слоя устойчивости вокруг наших возможностей Kafka.Мне нужно, чтобы все они управлялись отдельным исполняемым классом, который можно запускать вручную, предоставляя соответствующие данные (например, метки времени и т. Д.).Этот класс должен определять смещения, а затем искать это смещение, извлекать сообщения, соответствующие этим смещениям, и публиковать их в отдельной теме.Может кто-нибудь, пожалуйста, укажите мне в правильном направлении?Боюсь, я хожу по кругу.
Мир, Анант