Материализовать KStream в глобально общий магазин? - PullRequest
0 голосов
/ 20 февраля 2020

Я использую API Kafka Streams в приложении Java (Spring Cloud Stream). У меня есть конкретный вариант использования, как показано ниже:

  • Мое приложение будет потреблять из topi c A, а также производить и потреблять в / из topi c B.
  • Для Каждое сообщение в topi c A содержит набор соответствующих сообщений, созданных для topi c B, которые приложение использует для отслеживания изменений внутреннего состояния. Он использует topi c B с KStream, чтобы материализовать это состояние как хранилище с запросами.

Так как будет запущено несколько экземпляров приложения, и не может быть гарантировано, какие конкретные разделы любого из topi c будет назначено экземплярам, ​​обязательно, чтобы хранилище состояний было общим между приложениями. В противном случае, если произойдет перебалансировка для topi c B, экземпляры приложения могут потерять информацию о состоянии, которую они отслеживают для сообщений в topi c A. Рассмотрим следующий сценарий:

  • Экземпляр 1 имеет раздел 1 для topi c A и раздел 1 для topi c B.
  • Произойдет перебалансировка разделов для topi c B.
  • Экземпляр 1 теперь имеет раздел 1 topi c A (без изменений), но имеет раздел 2 из topi c B.
  • Экземпляр 1 теперь потерял доступ к данным в хранилище состояний, которое он создал, когда имел раздел 1 для topi c B.

Такая же ситуация возникает, если перебалансировка происходит только для топи c A.

Возможно ли материализоваться в "глобальное хранилище состояний"? Я понимаю, что существует концепция GlobalKTable, но мне нужно использовать абстракцию KStream, так как мне нужен доступ ко всему потоку событий. Для справки мой потребитель KStream выглядит следующим образом:

    @StreamListener(INPUT_TOPIC)
    public void consumeKStream(KStream<String, Pojo> kStream) {
        kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
            map.put(value.getFoo(), value.getBar()); return map;
        }, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
                .withKeySerde(keySerde).withValueSerde(valueMapSerde));
    }

1 Ответ

0 голосов
/ 23 февраля 2020

Если вы читаете из topi c A и из topi c B, и у вас есть топология, которая материализует данные из topi c B и выполняет поиск в хранилище для записи topi c A, у вас будет гарантия того, что экземпляр получит совместное распределение. Следовательно, сценарий, который вы описываете, никогда не случится.

Вы можете убедиться в этом, проверив вас Topology (через describe()), который включает в себя под-топологии. Под-топологии выполняются, поскольку задачи и задачи имеют гарантированно совместное назначение ввода topi c.

Сравнение: https://docs.confluent.io/current/streams/architecture.html#parallelism -модель

...