Я новичок в потоке Кафки, мой вариант использования - сравнить значение каждой отдельной входящей записи из входной темы со значением из отдельной предыдущей записи и, если условие сравнения истинно, отправить новую запись, содержащую результат сравненияс индексом каждой сравниваемой записи в теме результатов, иначе не отправлять ничего.(обратите внимание, что все входящие записи могут иметь уникальный ключ для каждой записи или нулевой ключ).
Сделать это в API-интерфейсе Kafka для потребителей и производителей очень легко, но ((без использования внешней БД для хранения предыдущей записи)) я пытаюсь использовать только (Kafka streams DSL API), (включая KTable и KStream, с их внутренними методами, такими как, агрегировать, сокращать и т.д.состояние, чтобы сравнить его с текущим, затем сохранить текущую запись вместо старой, чтобы сравнить ее со следующей входящей записью.Несколько подходов пытаются использовать Processor API вместо Stream DSL API, но он включает в себя большую сложность, и я не совсем понял это.Вот почему я пытаюсь решить мою проблему с Stream DSL API.Но до сих пор, к сожалению, у меня ничего не получалось.
На самом деле, до сих пор у меня ничего не получалось.Можете ли вы помочь мне, предоставив подробный пример кода, чтобы сделать это с помощью Kafka Stream DSL?