Kafka Streams (Scala): неверная топология: StateStore еще не добавлен - PullRequest
1 голос
/ 08 мая 2019

У меня есть топология, где у меня есть поток A.

Из этого потока A я создаю WindowedStore S.

       A  --> [S]

Затем я хочу преобразовать объекты в A в зависимости от данных на S, а также эти преобразованные объекты, чтобы они поступили в логику WindowStore (через transformValues).

Для этого я создаю для этого Трансформатор, создаю Поток A' и информирую о нем окошко (то есть теперь S будет сделано из A', а не из A).

  A -> A'  --> [S]
       ^__read__|

Но я не могу этого сделать, потому что, когда я создаю Топологию, выдается исключение:

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.

Есть ли способ обойти это? Это ограничение?

Пример кода:

  // A 
  val sessionElementsStream: KStream[K, SessionElement] = ...
  // A'
  val sessionElementsTransformed : KStream[K, SessionElementTransformed] = {
    // Here we use the sessionStoreName - but it is not added yet to the Topology
    sessionElementsStream.
      transformValues(sessionElementTransformerSupplier, sessionStoreName)
  }

  val sessionElementsWindowedStream: SessionWindowedKStream[K, SessionElementTransformed] = {
    sessionElementsTransformed.
      groupByKey(sessionElementTransformedGroupedBy).
      windowedBy(sessionWindows)
  }

  val sessionStore : KTable[Windowed[K], List[WindowedSession]] = 
    sessionElementsWindowedStream.aggregate(
        initializer = List.empty[WindowedSession])(
        aggregator = anAggregator, merger = aMerger)(materialized = getMaterializedMUPKSessionStore(sessionStoreName))     

Первоначальная проблема заключается в том, что в зависимости от значений предыдущих сессий, я хотел бы изменить сессии после нее. Но если я сделаю это в преобразователе после сеанса, эти преобразованные сеансы могут быть изменены и отправлены в нисходящем направлении - но они не будут отражать свое новое состояние в S - поэтому дальнейшие запросы к хранилищу будут иметь старые значения.

Kafka Streams 2.1, Scala 2.12.4. Разделенные темы.

UPDATE

Есть способ сделать это в DSL, используя дополнительную тему:

  • Отправлено A 'to этой теме
  • Создайте builder.stream из этой темы и создайте из нее хранилище.
  • Определите хранилище перед тем, как определить преобразование (чтобы на этапе преобразования можно было использовать хранилище, поскольку оно уже определено ранее).

Тем не менее, звучит громоздко, чтобы использовать здесь дополнительную тему. Нет ли другого, более простого способа ее решить?

1 Ответ

0 голосов
/ 09 мая 2019

Но я не могу этого сделать, потому что при создании топологии выдается исключение:

Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore storeName is not added yet.

Похоже, вы просто забыли буквально "добавить" хранилище состоянийк вашей топологии обработки, а затем присоедините («сделайте доступным») хранилище состояний к вашему Transformer.

Вот фрагмент кода, демонстрирующий это (извините, в Java).

Добавлениехранилище состояний в вашей топологии:

final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<String, Long> myStateStore =
    Stores.keyValueStoreBuilder(
             Stores.persistentKeyValueStore("my-state-store-name"),
             Serdes.String(),
             Serdes.Long())
          .withCachingEnabled();
builder.addStateStore(myStateStore);

Присоединение хранилища состояний к вашему Transformer:

final KStream<String, Double> stream = builder.stream("your-input-topic", Consumed.with(Serdes.String(), Serdes.Double()));

final KStream<String, Long> transformedStream =
    stream.transform(new YourTransformer(myStateStore.name()), myStateStore.name());

И, конечно, ваш Transformer должен интегрировать хранилище состояний с кодомкак следующее (это Transformer читает <String, Double> и пишет String, Long>).

class MyTransformer implements TransformerSupplier<String, Double, KeyValue<String, Long>> {

  private final String myStateStoreName;

  MyTransformer(final String myStateStoreName) {
    this.myStateStoreName = myStateStoreName;
  }

  @Override
  public Transformer<String, Double, KeyValue<String, Long>> get() {
    return new Transformer<String, Double, KeyValue<String, Long>>() {

      private KeyValueStore<String, Long> myStateStore;
      private ProcessorContext context;

      @Override
      public void init(final ProcessorContext context) {
        myStateStore = (KeyValueStore<String, Long>) context.getStateStore(myStateStoreName);
      }

   // ...
  }
}
...