Я пытаюсь вывести конфигурацию приложения spring-kafka, которое я сейчас написал в коде Java.Должен ли я помещать значения ProducerConfig
и ConsumerConfig
в spring.kafka.streams.properties
, или они будут правильно настроены, если я предоставлю их через spring.kafka.producer
и spring.kafka.consumer
?
Пока что кажется, что яя должен получить всю свою конфигурацию в bean-компоненте типа KafkaStreamsConfiguration
, чтобы настроить мое приложение kafka-streams.В настоящее время я делаю это, устанавливая значения ProducerConfig
и ConsumerConfig
непосредственно в коде.
Когда я экстернализую эту конфигурацию, кажется, что установка значений свойств из ProducerConfig
и ConsumerConfig
в *Файл 1015 * не коррелирует с тем, что он находится в KafkaStreamsConfiguration
, созданном подпружиненной загрузкой (я подтвердил это, автоматически подключив конфигурацию куда-то и посмотрев на нее).
Если я вместо этого предоставлю ProducerConfig
и от ConsumerConfig
значений до spring.kafka.streams.properties
они отображаются в KafkaStreamsConfiguration
.
Вот моя старая конфигурация Java:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put("replication.factor", replicationFactor);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "600000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaStreamsConfiguration(props);
}
это заканчивается ProducerConfig
и ConsumerConfig
значения, отсутствующие в KafkaStreamsConfiguration
во время выполнения:
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.consumer.auto-offset-reset=latest #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor
Это, однако, приводит к тому, что KafkaStreamsConfiguration
имеет значения, как и ожидалось:
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.streams.properties.group-id=<group_id> #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.compression-type=lz4 #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.streams.properties.auto-offset-reset=latest #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor
Я ожидалЗначения ProducerConfig
и ConsumerConfig
распространяются на KafkaStreamsConfiguration
при настройке через spring.kafka.producer
и spring.kafka.consumer
соответственно.Тем более, что я получаю Intellisense в IntelliJ для конфигов Producer и Consumer в application.properties
.
Тем не менее, мне нужно убедиться, что я устанавливаю их через spring.kafka.streams.properties
, чтобы приложение работало правильносконфигурировано