Я заметил, что стадия aggregate()
, кажется, сериализует / десериализует каждый отдельный элемент, хотя периодически генерирует результат.
streamBuilder
.stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
.groupByKey(Serialized.`with`(keySerde, inputValueSerde))
.aggregate(
() => Snapshot.Initial(),
(_, event, prevSnap: Snapshot) => {
// ...
},
Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
)
.toStream()
Я надеялся, что хранилище значений ключей будет работать в памяти, пока не произойдет запись при фиксации. Похоже, что для каждого обновления производятся не только записи, но и операции чтения, которые десериализуются обратно. Может кто-нибудь объяснить, как это работает внизу и стоит ли мне беспокоиться о производительности?