Создание Глобального Государственного Магазина в Кафке Потоки (Весна) - PullRequest
1 голос
/ 16 марта 2020

Я новичок в Kafka и попытался создать небольшую реализацию Kafka KTable. Я успешно добавил KTable и смог запросить. Я использовал местный государственный магазин, и он работал как ожидалось. Ниже моя конфигурация локального хранилища состояний

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties) {
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MessageSerdes.class.getName());
    config.put(StreamsConfig.STATE_DIR_CONFIG, directory);
    //TODO : verify error strategy
    config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
    return new KafkaStreamsConfiguration(config);
}

Теперь я хочу использовать глобальное состояние с использованием RP C. Меня смущают несколько вопросов. Чтобы добавить глобальное хранилище состояний, мне нужно добавить конечную точку RP C

config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.0.1:8080");

. В документации сказано:

"Единственное требование состоит в том, что слой RP C внедрен в приложении Kafka Streams "

  • Означает ли это, что нам нужно создать конечную точку клиента в приложении Kafka, если это так, если это приложение Spring Boot с веб-зависимостью, это как" localhost: 8080 "
  • Каким образом другие экземпляры этого приложения будут подключаться только через APPLICATION_SERVER_CONFIG (application.server) и выполнять интерактивные запросы или поддерживать состояние syn c. Я имею в виду Как предоставить дополнительную конфигурацию для других экземпляров того же приложения для создания syn c в глобальном состоянии.
  • Если создается глобальное состояние Нужно ли нам хранить резервную копию в Mongodb или другом месте по любой причине. (Отказоустойчивость) Учитывая, что БД никогда не будет такой же быстрой, как запись на диск, мы вообще о ней заботимся или должны полагаться на распределенную архитектуру

Было бы замечательно, если бы какая-то реализация Kafka Global State Store с примером дано.

1 Ответ

2 голосов
/ 16 марта 2020

Прежде всего, это не глобальное состояние, если вы хотите использовать глобальное состояние, вы должны создать GlobalKtable вместо KTable. Когда вы материализуете свой KTable в хранилище состояний, ваше хранилище состояний разбивается на разделы, и эти разделы распределяются по экземплярам вашего приложения, и каждый экземпляр может только запросить его хранилище состояний, следовательно, имя local state. Вы можете получить доступ к хранилищу других ваших экземпляров, добавив слой RP C к каждому экземпляру вашего приложения.

  1. Вы имеете в виду конечную точку сервера? Да.
  2. Документы Kafka утверждают, что Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.

Используя экземпляр StreamsMetadata, вы можете получить HostStoreInfo экземпляра приложения, имеющего раздел содержащий ключ, который вы хотите запросить.

В вашем случае (который вы используете KTable), это локальное состояние, оно поддерживается внутренней топологией журнала изменений Kafka c, которая включает сжатие журнала , поэтому ваше локальное состояние - error допуск , ваше местное состояние восстанавливается с помощью этого журнала изменений topi c во время запуска, этот topi c имеет формат:
<application.id>-<your-local-state-store-name>-changelog

Вы можете просмотреть пример того, как вы можете сделать запрос удаленное хранилище состояний для всего приложения здесь .

...