Хранение записей в государственном хранилище только в течение определенного времени - PullRequest
0 голосов
/ 21 июня 2019

Проблема: мне нужно узнать, как отправлено сообщение за последние, например, 24 часа.У меня есть следующий поток и хранилище состояний для поиска.

@SendTo(Bindings.MESSAGE_STORE)
@StreamListener(Bindings.MO)
public KStream<?, ?> groupBySender(KStream<String, Message> messages) {

     return  messages.selectKey((key,message) -> message.from)
                     .map((k,v) -> new KeyValue<>(k, v.sentAt.toString()))
                     .groupByKey()
                     .reduce((oldTimestamp, newTimestamp) -> newTimestamp,
                                Materialized.as(AggregatorApplication.MESSAGE_STORE))
                     .toStream();

}

все работает нормально

[
    "key=123 value=2019-06-21T13:29:05.509Z",
    "key=from value=2019-06-21T13:29:05.509Z",
]

, поэтому посмотрите как:

store.get(from);

, но я хотел бы автоматически удалять записи старше 24 часов из вхранить, в настоящее время они будут сохраняться вероятно навсегда

Есть ли лучший способ, как это сделать?может быть, какая-то оконная операция или так?

1 Ответ

0 голосов
/ 25 июня 2019

Atm, KTables (которые в основном являются хранилищами значений ключей) не поддерживают TTL (ср. https://issues.apache.org/jira/browse/KAFKA-4212)

Текущая рекомендация заключается в использовании оконного хранилища, если вы хотите просрочить данные. Возможно, вы захотите использовать пользовательский .transform() вместо windowedBy().reduce(), чтобы обеспечить большую гибкость. (ср. https://docs.confluent.io/current/streams/developer-guide/processor-api.html)

...