Я написал код для хранения обработанных сообщений потоков kafka в течение недели и проверяю полученные записи в этом хранилище, прежде чем продолжить процесс. Поэтому я использовал следующую конфигурацию хранилища состояний в приложении Kafka Streams.
private void addStore(StreamsBuilder streamsBuilder) {
WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(SmsKafkaStreamsConfig.STORE_NAME,
Duration.ofMillis(windowSize + (windowSize >> 1)), Duration.ofMillis(windowSize), false);
StoreBuilder storeBuilder = Stores.windowStoreBuilder(storeSupplier, Serdes.String(), Serdes.Bytes())
.withCachingEnabled();
streamsBuilder.addStateStore(storeBuilder);
}
Также для части дедупликации я написал следующий код в преобразователе.
private boolean checkDigest(String digest, WindowStore<String, Bytes> store) {
if (store.fetch(digest, System.currentTimeMillis() - digestWindowSize) == null) {
store.put(digest, STORE_VALUE, System.currentTimeMillis());
LOGGER.info(store.fetch(digest, System.currentTimeMillis() - digestWindowSize)+"");
LOGGER.info(store.fetch(digest, System.currentTimeMillis())+"");
return true;
} else {
return false;
}
}
У меня есть добавил две строки регистрации для проверки правильности, так как я не знаю их функциональности и правильности.
Мой первый вопрос: почему обе строки регистрации выводят null ? (Я хочу искать только данные прошлой недели до сих пор). И второй вопрос: если время хранения данных должно быть больше, чем windows размер в определении хранилища?