Как повторно использовать хранилище состояний в агрегате и препроцессоре? - PullRequest
0 голосов
/ 25 мая 2018

У меня есть сообщения, которые я хотел бы объединить в состояние, но я также хотел бы использовать это состояние на этапе предварительной обработки, который также может обновлять другие состояния.Например, можно посмотреть, изменяет ли это сообщение какую-то часть состояния, и, если да, обновить тему, которая отслеживает эту конкретную часть состояния, а также само состояние.

Лучший способ, которым я мог быПредставьте, что для этого нужно было использовать Transformer, у которого был доступ к хранилищу состояний, но делайте это до агрегации состояния, чтобы я мог видеть значение этого состояния до его обновления.(Рассматриваемый Трансформер просто вернет исходное сообщение после возможного обновления дополнительного состояния.)

Однако я попадаю в проблему с курицей и яйцом:

  1. Если я добавлю Materialized с aggregate(), как показано ниже, я получу ошибку топологии, указывающую, что хранилище состояний, к которому я пытаюсь получить доступ, еще не добавлено в топологию.

    KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde()));
          streamBuilder
              .transformValues(
                  new MessagePreprocessorSupplier(
                      "state_store_topic_name"
                  ),
                  "state_store_topic_name"
              )
              .groupByKey()
              .aggregate(
                  () -> null,
                  new MyAggregator(),
                  Materialized.as("state_store_topic_name")
              );
    

    Это поднимает:

    org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is not added yet.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:546)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:538)
    at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
    
  2. Если я сначала пытаюсь создать хранилище состояний, то могу добавить его в преобразователь, но при вызове .aggregate() я получаю ошибку,указывая, что он не может добавить хранилище состояний в этот момент, потому что мы уже добавили его ранее.

    Materialized<String, MyState, KeyValueStore<Bytes, byte[]>> myStateStoreProvider =
        Materialized.<String, Thermostat, KeyValueStore<Bytes, byte[]>>as("state_store_topic_name")
            .withKeySerde(Serdes.String())
            .withValueSerde(myStateSerde);
    /* really don't think we should need this, but if I don't, the .transformValues
       says it wasn't added to the topology... */
    streamsBuilder.table("state_store_topic_name", myStateStoreProvider);
    
    KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde()));
          streamBuilder
              .transformValues(
                  new MessagePreprocessorSupplier(
                      "state_store_topic_name"
                  ),
                  "state_store_topic_name"
              )
              .groupByKey()
              .aggregate(
                  () -> null,
                  new MyAggregator(),
                  myStateStoreProvider
              );
    

    Это поднимает:

    org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is already added.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:523)
    at org.apache.kafka.streams.kstream.internals.GroupedStreamAggregateBuilder.build(GroupedStreamAggregateBuilder.java:71)
    at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.doAggregate(KGroupedStreamImpl.java:488)
    at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregateMaterialized(KGroupedStreamImpl.java:175)
    at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregate(KGroupedStreamImpl.java:167)
    at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
    

1 Ответ

0 голосов
/ 07 июня 2018

Это может сработать, если вы подключитесь к Processor API для подключения хранилища к преобразователю:

  • вы добавляете преобразователь без добавления состояния (поскольку он еще не существует)
  • вы регулярно добавляете свою агрегацию, так что создается состояние
  • после того, как вы позвонили builder.build(), вы звоните Topology#connectProcessorAndStateStore()
    • , вам нужно передать имя преобразователя и магазина
    • Вы можете получить имя трансформатора через Topology#describe()

Я не пробовал это, хотя ...

...