Время хранения в местном государственном магазине kafka / журнал изменений - PullRequest
0 голосов
/ 14 февраля 2019

Я использую Kafka и Kafka Streams как часть Spring Cloud Stream.Данные, которые передаются в моем приложении Kafka Streams, агрегируются и материализуются по определенным временным окнам:

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
    oneHour.withLoggingEnabled(topicConfig);
    events
            .map(getStringSensorMeasurementKeyValueKeyValueMapper())
            .groupByKey()
            .windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                    (oneHour));

Как и задумано, материализуемая информация также поддерживается темой журнала изменений.

Наше приложение также имеет конечную точку отдыха, которая будет запрашивать хранилище состояний следующим образом:

 ReadOnlyWindowStore<String, Double> windowStore =  queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
 WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);

Глядя на настройки создаваемой темы журнала изменений, она гласит:

min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1

Я быПредположим, что местный государственный магазин будет хранить информацию как минимум 61 день (~ 2 месяца).Однако, похоже, что в хранилищах остается только последний день данных.

Что может привести к такому удалению данных?

Обновление с решением Потоки Кафкиверсия 2.0.1 не содержит метод Materialized.withRetention.Для этой конкретной версии я смог установить время хранения хранилищ состояний, используя следующий код, который решает мою проблему:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
    timeWindows.until(retentionMs);

, позволяющий писать мой код следующим образом:

...

.groupByKey()
        .windowedBy(timeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                (oneHour));
...

1 Ответ

0 голосов
/ 14 февраля 2019

Для оконных KTable с существует локальное время хранения и время хранения журнала изменений.Вы можете установить время хранения в локальном хранилище с помощью Materialized.withRetentionTime(...) - значение по умолчанию - 24 ч.

Для более ранней версии Kafka время хранения в локальном хранилище задается с помощью Windows#until().

Если создается новое приложение, разделы журнала изменений создаются с тем же временем хранения, что и время хранения в локальном хранилище.Однако, если вы вручную увеличите время хранения журнала, это не повлияет на время хранения вашего магазина, но вам необходимо соответствующим образом обновить код.Это также верно, когда тема журнала изменений уже существует: если вы изменяете время хранения локального хранилища, конфигурация темы журнала изменений не обновляется автоматически.

Для этого также существует Jira: https://issues.apache.org/jira/browse/KAFKA-7591

...