В Spring Boot с Kafka я могу установить свойства для ConsumerFactory следующим образом:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, EnrichedOrder> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "barnwaldo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedOrderDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EnrichedOrder> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EnrichedOrder> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Используя потоки Kafka, я могу установить свойства в коде следующим образом:
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
При работе с Spring Cloud Streams и Kafka Streams все свойства отображаются для ввода только через файлы application.properties или application.yml в папке ресурсов, такие как
spring.cloud.stream.bindings:
output:
contentType: application/json
destination: data2
input:
contentType: application/json
destination: data1
spring.cloud.stream.kafka.streams:
binder:
brokers: localhost
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.input.consumer:
applicationId: data-tester
Есть лиспособ включить свойства в HashMap или Properties при использовании Spring Cloud Streams с Kafka Streams.
Возможно, это можно как-то сделать с помощью KafkaMessageChannelBinder или путем расширения AbstractMessageChannelBinder - см. https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/7355ada4613ad50fe95430f1859d4ea65f004be1/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java.
Я не могу найти никакой документации, ссылающейся на это;любая помощь очень ценится.