Невозможно обмениваться данными в государственном хранилище между под-топологиями приложения Kafka Stream - PullRequest
0 голосов
/ 24 февраля 2020

Примечание: это неверный вопрос. Пожалуйста, игнорируйте.

У меня KSA прослушивает две темы (две под-топологии), одна (под-топология A) записывает в хранилище состояний, а другая (под-топология B) читает из хранилища состояний ,

Запись

...
  stream
      .mapValues(v -> new Version(v.getHeader().getOccurredAt().getSeconds(), v.getVersion().getValue()))
      .groupByKey()
      .aggregate(
        () -> new Version(0,0),
        (aggKey, newValue, aggValue) -> aggValue.getTimestamp() > newValue.getTimestamp() ? aggValue : newValue,
      Materialized.<String, AgentVersion, KeyValueStore<Bytes, byte[]>>as(conf.versionStoreName())
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.serdeFrom(
          new Version.Serializer(), new Version.Deserializer())));

Чтение

 ReadOnlyKeyValueStore<String, Version> getVersionStore() {
  return app().store(conf.versionStoreName(), QueryableStoreTypes.keyValueStore());
}

Однако я обнаружил, что B не может получить данные, записанные A (A может правильно получить дату) .
Я что-то пропустил?

1 Ответ

0 голосов
/ 25 февраля 2020

По своей сути, подтопологии изолированы друг от друга. Если вы хотите разрешить доступ из одной под-топологии к хранилищу из другой под-топологии, вам нужно соединить обе под-топологии в одну. Например, добавив соответствующее хранилище к процессору / преобразователю.

Ср. https://docs.confluent.io/current/streams/architecture.html#stream -разбиения-и-задачи

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...