Проверьте, заполнен ли StateStore полностью - PullRequest
2 голосов
/ 11 марта 2020

У меня есть компактный топи c с примерно 30 миллионами ключей. Мой App материализует этот топи c до KeyValueStore.

Как я могу проверить, заполнен ли KeyValueStore полностью? Если я ищу ключ через InteractiveQuery, мне нужно знать, если ключ отсутствует, потому что StateStore еще не готов или ключ действительно отсутствует.

Я материализую StateStore следующим образом:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }

Ответы [ 2 ]

3 голосов
/ 13 марта 2020

В общем случае не существует такого понятия, как «полностью загруженный», поскольку после запуска приложения в любой момент времени новые данные могут быть записаны на вход topi c, и эти новые данные будут считаны для обновления соответствующих Таблица.

Что вы можете сделать, это отслеживать отставание потребителей: в вашем приложении KafkaStreams#metrics() позволяет вам получить доступ ко всем показателям клиента (ie, потребитель / производитель) и Kafka Streams. Потребитель выставляет метрику c, называемую records-lag-max, которая может помочь.

Конечно, во время обычной обработки (при условии, что новые данные постоянно записываются на вход topi c), задержка потребителя будет go все время вверх и вниз.

0 голосов
/ 12 марта 2020

Обновлено : Я неправильно понял вопрос OP: «Как проверить, закончилась ли топология, материализовал ввод topi c в хранилище состояний» в «Процесс восстановления хранилища состояний»

Вы может получить KeyValueStore из вашего экземпляра KafkaStreams только в том случае, если состояние KafkaStreams изменилось с состояния REBALANCING на RUNNING. Вы можете проверить этот переход состояния, используя StreamsBuilderFactoryBeanCustomizer для доступа к базовому экземпляру KafkaStreams. Если вы просто хотите проверить, когда все хранилища состояний были заполнены полностью и когда поток потока kafka готов, чтобы вы могли получить KeyValueStore, вы можете прослушивать StateListener:

@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
    return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
        if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
            // set flag that `stateStore` store of current KafkaStreams has been fully restore
            // then you can get
        }
    }
}

или если вы хотите получить хранилище из KafkaStreams instance

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                //get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
                //and set flag that `stateStore` store of current KafkaStreams has been fully restore
            }
        });
    });
}

Подробнее в документации .

Обратите внимание, что должен быть только один экземпляр StreamsBuilderFactoryBeanCustomizer.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...