Я понимаю вашу путаницу здесь, но оптимизация для тем журнала изменений предназначена только для источника KTable
операций. Например:
KTable<String, String> someTable = builder.table("topic");
При включенных оптимизациях Kafka Streams будет использовать исходный файл topi c для KTable
в качестве списка изменений topi c.
Если вы не хотите, чтобы Kafka Streams создавал список изменений topi c для вашей агрегации, тогда вам необходимо явно отключить его с помощью Materialized.withLoggingDisabled()
config объекта:
streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) .aggregate(initializer, aggregator, Materialized.as("store-name").withLoggingDisabled();
Выше приведен только один пример; Вы также можете использовать Materialized
для настройки ключа и значения Serdes
, если требуется. Но суть в том, чтобы предотвратить темы изменений, которые вам нужно использовать Materialized.withLoggingDisabled()
.
Кроме того, вы можете настроить разделы журнала изменений, используя StoreBuilder
. Javado c для Stores
имеет краткий пример использования StoreBuilder
.