Стадия агрегации Kafka Streams сериализует и десериализует каждый отдельный элемент? - PullRequest
8 голосов
/ 29 мая 2019

Я заметил, что стадия aggregate(), кажется, сериализует / десериализует каждый отдельный элемент, хотя периодически генерирует результат.

  streamBuilder
      .stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
      .groupByKey(Serialized.`with`(keySerde, inputValueSerde))
      .aggregate(
        () => Snapshot.Initial(),
        (_, event, prevSnap: Snapshot) => {
          // ...
        },
        Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
      )
      .toStream()

Я надеялся, что хранилище значений ключей будет работать в памяти, пока не произойдет запись при фиксации. Похоже, что для каждого обновления производятся не только записи, но и операции чтения, которые десериализуются обратно. Может кто-нибудь объяснить, как это работает внизу и стоит ли мне беспокоиться о производительности?

1 Ответ

0 голосов
/ 03 июня 2019

Ваше наблюдение о том, что данные всегда (де) сериализуются, является правильным, даже если все данные находятся в памяти.Все хранилища в Kafka Streams основаны на массивах byte[], чтобы обеспечить правильное управление памятью.Головные объекты Java десериализатора имеют неизвестный размер и делают управление памятью трудным и использование памяти непредсказуемым.

Ваше хранилище будет по-прежнему работать в памяти, а запись на диск происходит только при необходимости и при фиксации.

...