Приложение Kafka Streams все еще пытается создать список изменений topi c, хотя я устанавливаю свойство оптимизации - PullRequest
1 голос
/ 28 апреля 2020

Ниже приведен фрагмент кода, который я использую,

`streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG," wordcount-live-test "); streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerIP: port"); streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put ( StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE );

Построитель StreamBuilder = new StreamBuilder (); KStream streamData = builder.stream (inputTopicName);

streamData.groupByKey (Grouped.with (jsonSerde, jsonSerde)) .aggregate (// некоторые преобразования);

KafkaStreams kafkaStreams = new KafkaStreams = new KafkaStreams = new builder.build (streamConfiguration), streamConfiguration); `

Я проверил страницу слияния на предмет оптимизации и следовал предложенным изменениям. но все же он пытается сгенерировать changelogTopi c.

1 Ответ

1 голос
/ 28 апреля 2020

Я понимаю вашу путаницу здесь, но оптимизация для тем журнала изменений предназначена только для источника KTable операций. Например:

KTable<String, String> someTable = builder.table("topic");

При включенных оптимизациях Kafka Streams будет использовать исходный файл topi c для KTable в качестве списка изменений topi c.

Если вы не хотите, чтобы Kafka Streams создавал список изменений topi c для вашей агрегации, тогда вам необходимо явно отключить его с помощью Materialized.withLoggingDisabled() config объекта:

streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) .aggregate(initializer, aggregator, Materialized.as("store-name").withLoggingDisabled();

Выше приведен только один пример; Вы также можете использовать Materialized для настройки ключа и значения Serdes, если требуется. Но суть в том, чтобы предотвратить темы изменений, которые вам нужно использовать Materialized.withLoggingDisabled().

Кроме того, вы можете настроить разделы журнала изменений, используя StoreBuilder. Javado c для Stores имеет краткий пример использования StoreBuilder.

...