У меня есть сообщения, которые я хотел бы объединить в состояние, но я также хотел бы использовать это состояние на этапе предварительной обработки, который также может обновлять другие состояния.Например, можно посмотреть, изменяет ли это сообщение какую-то часть состояния, и, если да, обновить тему, которая отслеживает эту конкретную часть состояния, а также само состояние.
Лучший способ, которым я мог быПредставьте, что для этого нужно было использовать Transformer, у которого был доступ к хранилищу состояний, но делайте это до агрегации состояния, чтобы я мог видеть значение этого состояния до его обновления.(Рассматриваемый Трансформер просто вернет исходное сообщение после возможного обновления дополнительного состояния.)
Однако я попадаю в проблему с курицей и яйцом:
Если я добавлю Materialized
с aggregate()
, как показано ниже, я получу ошибку топологии, указывающую, что хранилище состояний, к которому я пытаюсь получить доступ, еще не добавлено в топологию.
KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde()));
streamBuilder
.transformValues(
new MessagePreprocessorSupplier(
"state_store_topic_name"
),
"state_store_topic_name"
)
.groupByKey()
.aggregate(
() -> null,
new MyAggregator(),
Materialized.as("state_store_topic_name")
);
Это поднимает:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is not added yet.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:546)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:538)
at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
Если я сначала пытаюсь создать хранилище состояний, то могу добавить его в преобразователь, но при вызове .aggregate()
я получаю ошибку,указывая, что он не может добавить хранилище состояний в этот момент, потому что мы уже добавили его ранее.
Materialized<String, MyState, KeyValueStore<Bytes, byte[]>> myStateStoreProvider =
Materialized.<String, Thermostat, KeyValueStore<Bytes, byte[]>>as("state_store_topic_name")
.withKeySerde(Serdes.String())
.withValueSerde(myStateSerde);
/* really don't think we should need this, but if I don't, the .transformValues
says it wasn't added to the topology... */
streamsBuilder.table("state_store_topic_name", myStateStoreProvider);
KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde()));
streamBuilder
.transformValues(
new MessagePreprocessorSupplier(
"state_store_topic_name"
),
"state_store_topic_name"
)
.groupByKey()
.aggregate(
() -> null,
new MyAggregator(),
myStateStoreProvider
);
Это поднимает:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is already added.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:523)
at org.apache.kafka.streams.kstream.internals.GroupedStreamAggregateBuilder.build(GroupedStreamAggregateBuilder.java:71)
at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.doAggregate(KGroupedStreamImpl.java:488)
at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregateMaterialized(KGroupedStreamImpl.java:175)
at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregate(KGroupedStreamImpl.java:167)
at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)