У меня есть потоковое приложение, которое непрерывно принимает поток координат вместе с некоторыми пользовательскими метаданными, которые также включают цепочку битов.Этот поток создается по теме kafka с использованием API производителя.Теперь другое приложение должно обработать этот поток [Streams API] и сохранить определенный бит из строки битов и генерировать оповещения, когда этот бит изменяется
Ниже представлен непрерывный поток сообщений, которые необходимо обработать
{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0
Теперь я хотел бы написать эти оповещения в другую тему кафки, например
{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}
. Я могу сохранить текущий бит в хранилище состояний, используя что-то вроде
streamsBuilder
.stream("my-topic")
.mapValues((key, value) -> value.getStatusBit())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));
, ноЯ не могу понять, как я могу обнаружить это изменение из существующего бита.Нужно ли как-то запрашивать хранилище состояний внутри топологии процессора?Если да?Как?Если нет?Что еще можно сделать?
Любые предложения / идеи, которые я могу попробовать (возможно, полностью отличается от того, что я думаю), также приветствуются.Я новичок в Kafka и думаю, что с точки зрения событийно-ориентированных потоков ускользает от меня.
Заранее спасибо.