Я использую API Kafka Streams в приложении Java (Spring Cloud Stream). У меня есть конкретный вариант использования, как показано ниже:
- Мое приложение будет потреблять из topi c A, а также производить и потреблять в / из topi c B.
- Для Каждое сообщение в topi c A содержит набор соответствующих сообщений, созданных для topi c B, которые приложение использует для отслеживания изменений внутреннего состояния. Он использует topi c B с KStream, чтобы материализовать это состояние как хранилище с запросами.
Так как будет запущено несколько экземпляров приложения, и не может быть гарантировано, какие конкретные разделы любого из topi c будет назначено экземплярам, обязательно, чтобы хранилище состояний было общим между приложениями. В противном случае, если произойдет перебалансировка для topi c B, экземпляры приложения могут потерять информацию о состоянии, которую они отслеживают для сообщений в topi c A. Рассмотрим следующий сценарий:
- Экземпляр 1 имеет раздел 1 для topi c A и раздел 1 для topi c B.
- Произойдет перебалансировка разделов для topi c B.
- Экземпляр 1 теперь имеет раздел 1 topi c A (без изменений), но имеет раздел 2 из topi c B.
- Экземпляр 1 теперь потерял доступ к данным в хранилище состояний, которое он создал, когда имел раздел 1 для topi c B.
Такая же ситуация возникает, если перебалансировка происходит только для топи c A.
Возможно ли материализоваться в "глобальное хранилище состояний"? Я понимаю, что существует концепция GlobalKTable, но мне нужно использовать абстракцию KStream, так как мне нужен доступ ко всему потоку событий. Для справки мой потребитель KStream выглядит следующим образом:
@StreamListener(INPUT_TOPIC)
public void consumeKStream(KStream<String, Pojo> kStream) {
kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
map.put(value.getFoo(), value.getBar()); return map;
}, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
.withKeySerde(keySerde).withValueSerde(valueMapSerde));
}