Проблема: мне нужно узнать, как отправлено сообщение за последние, например, 24 часа.У меня есть следующий поток и хранилище состояний для поиска.
@SendTo(Bindings.MESSAGE_STORE)
@StreamListener(Bindings.MO)
public KStream<?, ?> groupBySender(KStream<String, Message> messages) {
return messages.selectKey((key,message) -> message.from)
.map((k,v) -> new KeyValue<>(k, v.sentAt.toString()))
.groupByKey()
.reduce((oldTimestamp, newTimestamp) -> newTimestamp,
Materialized.as(AggregatorApplication.MESSAGE_STORE))
.toStream();
}
все работает нормально
[
"key=123 value=2019-06-21T13:29:05.509Z",
"key=from value=2019-06-21T13:29:05.509Z",
]
, поэтому посмотрите как:
store.get(from);
, но я хотел бы автоматически удалять записи старше 24 часов из вхранить, в настоящее время они будут сохраняться вероятно навсегда
Есть ли лучший способ, как это сделать?может быть, какая-то оконная операция или так?