Оконные государственные магазины Kafka не убираются после хранения - PullRequest
0 голосов
/ 03 апреля 2019

По какой-то причине мои старые государственные хранилища не очищаются после истечения срока действия политики хранения.Я тестирую его локально, поэтому я отправляю одно тестовое сообщение каждые 5 минут или около того.У меня установлена ​​длительность хранения только для тестирования.retentionPeriod = 120, retentionWindowSize = 15, и я предполагаю, что сохранение дубликатов должно быть ложным.Когда это должно быть правдой?

Stores.persistentWindowStore(storeName,
                        Duration.of(retentionPeriod, ChronoUnit.SECONDS),
                        Duration.of(retentionWindowSize, ChronoUnit.SECONDS),
                        false)

Когда я захожу в каталог хранилища состояний, я хорошо вижу старые хранилища после истечения срока хранения.Например, store.1554238740000 (при условии, что число - эпоха мс).Я хорошо передаю 2-минутное время хранения, и этот каталог все еще там.

Что мне не хватает?

Обратите внимание, что в конечном итоге он очищается намного позже, чем я ожидал.Что вызывает очистку?

1 Ответ

0 голосов
/ 15 апреля 2019

Срок хранения является минимальной гарантией того, как долго хранятся данные. Чтобы сделать срок действия эффективным, так называемые сегменты используются для разделения временной шкалы на «сегменты». Только после истечения времени для всех данных в сегменте сегмент удаляется. По умолчанию Kafka Streams использует 3 сегмента. Таким образом, для вашего примера со временем удерживания 120 секунд каждый сегмент будет иметь размер 60 секунд (не 40 секунд). Причина состоит в том, что самый старый сегмент может быть удален только из всех данных, в которых прошло время хранения. Если размер сегмента составляет всего 40 секунд, для достижения этого потребуется 4 сегмента:

S1 [0-40) -- S2 [40,80) -- S3 [80,120)

Если запись с отметкой времени 121 должна быть сохранена, S1 еще нельзя удалить, поскольку она содержит данные для отметок времени от 1 до 40, которые еще не прошли период хранения. Таким образом, новый сегмент S4 потребуется. Для размера сегмента 60 достаточно 3 сегмента:

S1 [0-60) -- S2 [60,120) -- S3 [120,180)

В этом случае, если поступает запись с отметкой времени 181, всем данным в первом сегменте передается время хранения 181 - 120 = 61, и, таким образом, S1 можно удалить до создания S4.

Обратите внимание, что, начиная с Kafka 2.1, внутренний механизм остается тем же, однако Kafka Streams строго соблюдает период хранения на уровне приложения, т. Е. Запись отбрасывается, а чтение возвращает null для всех данных, переданных срок хранения (даже если данные все еще там, потому что сегмент все еще используется).

...