Я использую Kafka и Kafka Streams как часть Spring Cloud Stream.Данные, которые передаются в моем приложении Kafka Streams, агрегируются и материализуются по определенным временным окнам:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
oneHour.withLoggingEnabled(topicConfig);
events
.map(getStringSensorMeasurementKeyValueKeyValueMapper())
.groupByKey()
.windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
Как и задумано, материализуемая информация также поддерживается темой журнала изменений.
Наше приложение также имеет конечную точку отдыха, которая будет запрашивать хранилище состояний следующим образом:
ReadOnlyWindowStore<String, Double> windowStore = queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
Глядя на настройки создаваемой темы журнала изменений, она гласит:
min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1
Я быПредположим, что местный государственный магазин будет хранить информацию как минимум 61 день (~ 2 месяца).Однако, похоже, что в хранилищах остается только последний день данных.
Что может привести к такому удалению данных?
Обновление с решением Потоки Кафкиверсия 2.0.1 не содержит метод Materialized.withRetention.Для этой конкретной версии я смог установить время хранения хранилищ состояний, используя следующий код, который решает мою проблему:
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
, позволяющий писать мой код следующим образом:
...
.groupByKey()
.windowedBy(timeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
...