Я хочу выполнить дедупликацию в моем приложении kafka-streams, которое использует хранилище состояний и использует этот очень хороший пример:
https://github.com/confluentinc/kafka-streams-examples/blob/5.5.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
У меня несколько вопросов об этом примере.
Как я правильно понимаю, в этом примере вкратце сделайте это:
- Сообщение приходит на вход topi c
- Посмотрите в магазине, если он не существует, напишите в хранилище состояний и верните
- , если он существует, отбросьте запись, поэтому применяется дедупликация.
Но в примере кода есть время размер окна, который вы можете определить. Кроме того, время хранения сообщений в хранилище состояний. Вы также можете проверить, находится ли запись в хранилище или нет, указав временную метку timeFrom + timeTo
final long eventTime = context.timestamp();
final WindowStoreIterator<String> timeIterator = store.fetch(
key,
eventTime - leftDurationMs,
eventTime + rightDurationMs
);
Какова фактическая цель timeTo и timeFrom? Я не уверен, почему я проверяю следующий временной интервал, потому что я проверяю будущие сообщения, которые еще не приходили в мой topi c?
Мой второй вопрос связан с этим временным интервалом и должен УДАЛИТЬ предыдущий временное окно?
Если я могу выполнить поиск во временном интервале, указав timeTo и timeFrom, почему важен размер временного окна?
Если я даю размер окна 12 часов, могу ли я гарантировать, что я дедуплицировал сообщения в течение 12 часов?
Я думаю так:
Первое сообщение приходит с клавишей «A» в первую минуту запуска приложения, через 11 часов, сообщение с клавишей «А» приходит снова. Могу ли я поймать это дублированное сообщение, указав достаточный интервал времени, например eventTime - 12 часов?
Спасибо за любые идеи!