Выборка WindowStore в Kafka Streams всегда возвращает ноль - PullRequest
0 голосов
/ 20 апреля 2020

Я написал код для хранения обработанных сообщений потоков kafka в течение недели и проверяю полученные записи в этом хранилище, прежде чем продолжить процесс. Поэтому я использовал следующую конфигурацию хранилища состояний в приложении Kafka Streams.

 private void addStore(StreamsBuilder streamsBuilder) {
        WindowBytesStoreSupplier storeSupplier =  Stores.persistentWindowStore(SmsKafkaStreamsConfig.STORE_NAME,
                Duration.ofMillis(windowSize + (windowSize >> 1)), Duration.ofMillis(windowSize), false);
        StoreBuilder storeBuilder = Stores.windowStoreBuilder(storeSupplier, Serdes.String(), Serdes.Bytes())
                .withCachingEnabled();
        streamsBuilder.addStateStore(storeBuilder);
    }

Также для части дедупликации я написал следующий код в преобразователе.

    private boolean checkDigest(String digest, WindowStore<String, Bytes> store) {
        if (store.fetch(digest, System.currentTimeMillis() - digestWindowSize) == null) {
            store.put(digest, STORE_VALUE, System.currentTimeMillis());
            LOGGER.info(store.fetch(digest, System.currentTimeMillis() - digestWindowSize)+"");
            LOGGER.info(store.fetch(digest, System.currentTimeMillis())+"");
            return true;
        } else {
            return false;
        }
    }

У меня есть добавил две строки регистрации для проверки правильности, так как я не знаю их функциональности и правильности.

Мой первый вопрос: почему обе строки регистрации выводят null ? (Я хочу искать только данные прошлой недели до сих пор). И второй вопрос: если время хранения данных должно быть больше, чем windows размер в определении хранилища?

...