Как правильно настроить конфигурацию spring-boot kafka-streams в файле свойств? - PullRequest
1 голос
/ 13 июня 2019

Я пытаюсь вывести конфигурацию приложения spring-kafka, которое я сейчас написал в коде Java.Должен ли я помещать значения ProducerConfig и ConsumerConfig в spring.kafka.streams.properties, или они будут правильно настроены, если я предоставлю их через spring.kafka.producer и spring.kafka.consumer?

Пока что кажется, что яя должен получить всю свою конфигурацию в bean-компоненте типа KafkaStreamsConfiguration, чтобы настроить мое приложение kafka-streams.В настоящее время я делаю это, устанавливая значения ProducerConfig и ConsumerConfig непосредственно в коде.

Когда я экстернализую эту конфигурацию, кажется, что установка значений свойств из ProducerConfig и ConsumerConfig в *Файл 1015 * не коррелирует с тем, что он находится в KafkaStreamsConfiguration, созданном подпружиненной загрузкой (я подтвердил это, автоматически подключив конфигурацию куда-то и посмотрев на нее).

Если я вместо этого предоставлю ProducerConfigи от ConsumerConfig значений до spring.kafka.streams.properties они отображаются в KafkaStreamsConfiguration.

Вот моя старая конфигурация Java:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put("replication.factor", replicationFactor);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
        props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "600000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new KafkaStreamsConfiguration(props);
    }

это заканчивается ProducerConfig и ConsumerConfig значения, отсутствующие в KafkaStreamsConfiguration во время выполнения:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.consumer.auto-offset-reset=latest #this won't show up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

Это, однако, приводит к тому, что KafkaStreamsConfiguration имеет значения, как и ожидалось:

spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.properties.schema.registry.url=http://localhost:8081
spring.kafka.streams.application-id=<application_id>
spring.kafka.streams.properties.group-id=<group_id> #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.commit.interval.ms=100
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.compression-type=lz4 #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.state.cleanup.delay.ms=600000
spring.kafka.streams.properties.auto-offset-reset=latest #this shows up in KafkaStreamsConfiguration
spring.kafka.streams.properties.timestamp.extractor=org.apache.kafka.streams.processor.WallclockTimestampExtractor

Я ожидалЗначения ProducerConfig и ConsumerConfig распространяются на KafkaStreamsConfiguration при настройке через spring.kafka.producer и spring.kafka.consumer соответственно.Тем более, что я получаю Intellisense в IntelliJ для конфигов Producer и Consumer в application.properties.

Тем не менее, мне нужно убедиться, что я устанавливаю их через spring.kafka.streams.properties, чтобы приложение работало правильносконфигурировано

1 Ответ

0 голосов
/ 13 июня 2019

spring.kafka.consumer.group-id=<group_id> #this won't show up in KafkaStreamsConfiguration

Streams устанавливает group.id в свойство application.id.

public static final String APPLICATION_ID_CONFIG = "application.id";

private static final String APPLICATION_ID_DOC =" Идентификатор приложения обработки потока. Должен быть уникальным в кластере Kafka. Он используется как 1) префикс идентификатора клиента по умолчанию, 2) идентификатор группыдля управления членством 3) префикс темы журнала изменений. ";

См. KafkaProperties.

streams, producer и consumer свойства различны и не связаны.

spring.kafka.producer.compression-type=lz4 #this won't show up in KafkaStreamsConfiguration

compression-type не предоставляется как загрузочное свойство первого класса для потоков.Вы можете установить его, используя

spring.kafka.streams.properties.compression.type=gzip
...