Государственный магазин Kafka недоступен в распределенной среде - PullRequest
0 голосов
/ 28 апреля 2020

У меня есть бизнес-приложение со следующими версиями

  • spring boot (2.2.0.RELEASE) spring-Kafka (2.3.1-RELEASE)
    spring-cloud-stream-binder -kafka (2.2.1-RELEASE)
    spring-cloud-stream-binder-kafka-core (3.0.3-RELEASE)
    spring-cloud-stream-binder-kafka-streams (3.0.3-RELEASE )

У нас есть около 20 пакетов. Каждая партия использует 6-7 тем для управления бизнесом. Каждая служба имеет свое собственное хранилище состояний, чтобы поддерживать статус пакета независимо от того, запущен он или находится в режиме ожидания. Использование приведенного ниже кода для запроса к хранилищу

@Autowired
private InteractiveQueryService interactiveQueryService;
      public ReadOnlyKeyValueStore<String, String> fetchKeyValueStoreBy(String storeName) {
        while (true) {
            try {
                log.info("Waiting for state store");
                return new ReadOnlyKeyValueStoreWrapper<>(interactiveQueryService.getQueryableStore(storeName,
                        QueryableStoreTypes.<String, String> keyValueStore()));
            } catch (final IllegalStateException e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }

При развертывании приложения в одном экземпляре (Linux машина) все работает нормально. При развертывании приложения в 2 экземплярах мы находим следующие наблюдения

  1. Хранилище состояний доступно в одном экземпляре, а в другом нет.

  2. Когда запрос обрабатывается экземпляром, имеющим хранилище состояний все в порядке.

  3. Если запрос попадает в экземпляр, у которого нет хранилища состояний, приложение ожидает в течение времени l oop неопределенный (фрагмент кода выше).
  4. Пока экземпляр без хранилища ждет бесконечно, а если мы уничтожим другой экземпляр, приведенный выше код возвращает хранилище, и оно отлично обрабатывалось.

Понятия не имеем, чего нам не хватает.

1 Ответ

0 голосов
/ 28 апреля 2020

Если у вас несколько процессоров Kafka Streams, работающих с интерактивными запросами, приведенный выше код не будет работать так, как вы ожидаете. Он возвращает результаты только в том случае, если запрашиваемые ключи находятся на одном и том же сервере. Чтобы это исправить, вам нужно добавить свойство - spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port> для каждого экземпляра. Обязательно измените сервер и порт на правильные на каждом сервере. Затем вы должны написать код, подобный следующему:

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
                        key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

Пожалуйста, см. справочные документы для получения дополнительной информации. Вот пример кода , который демонстрирует это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...