Как установить свойства Kafka Streams в коде при использовании Spring Cloud Streams? - PullRequest
0 голосов
/ 31 января 2019

В Spring Boot с Kafka я могу установить свойства для ConsumerFactory следующим образом:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, EnrichedOrder> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "barnwaldo");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedOrderDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, EnrichedOrder> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, EnrichedOrder> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}

Используя потоки Kafka, я могу установить свойства в коде следующим образом:

    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

При работе с Spring Cloud Streams и Kafka Streams все свойства отображаются для ввода только через файлы application.properties или application.yml в папке ресурсов, такие как

spring.cloud.stream.bindings:
    output:
        contentType: application/json
        destination: data2
    input:
        contentType: application/json
        destination: data1
spring.cloud.stream.kafka.streams:
    binder:
      brokers: localhost
      configuration:
        commit.interval.ms: 1000
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    bindings.input.consumer:
        applicationId: data-tester

Есть лиспособ включить свойства в HashMap или Properties при использовании Spring Cloud Streams с Kafka Streams.

Возможно, это можно как-то сделать с помощью KafkaMessageChannelBinder или путем расширения AbstractMessageChannelBinder - см. https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/7355ada4613ad50fe95430f1859d4ea65f004be1/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java.

Я не могу найти никакой документации, ссылающейся на это;любая помощь очень ценится.

Ответы [ 2 ]

0 голосов
/ 01 февраля 2019

Спасибо за ваш ответ - я часто ссылался на первую ссылочную ссылку и буду рассматривать класс KafkaStreamsBinderSupportAutoConfiguration из ссылки Github.

Возможно, я смогу задать более конкретный вопрос о свойствах ...

Насколько я понимаю, потоки Kafka могут выполнять безопасную обработку потоков, настраивая потоки Kafka для шифрования передаваемых данных (при обмене данными с целевым кластером Kafka) и включения аутентификации клиента.В реализации Spring Cloud Streams Kafka Streams, как можно реализовать следующее в файле свойств application.yml?В частности, свойства SECURITY и SSL?

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "secure-kafka-streams-app");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "secure-kafka-streams-app-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, secureBootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
streamsConfiguration.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");

final StreamsBuilder builder = new StreamsBuilder();
builder.stream("secure-input").to("secure-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

Опять же, любая помощь очень ценится.

0 голосов
/ 31 января 2019

По умолчанию он имеет поддержку на уровне подшивки, где свойства должны иметь префикс spring.cloud.stream.kafka.streams.binder. literal

https://cloud.spring.io/spring-cloud-static/Greenwich.M3/multi/multi__apache_kafka_streams_binder.html#_configuration_options_3

Если вы видите класс KafkaStreamsBinderSupportAutoConfiguration, вы можете увидетьконфигурация bean-компонента, которая читает свойства yaml и задает потоки kafka.

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/master/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java

...