Я бы хотел настроить стратегию именования субъектов схемы Avro в источниках, потребителях и KStreams Spring Cloud Stream.
Это будет сделано в Кафке со свойствами key.subject.name.strategy
и value.subject.name.strategy
-> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
В родном Kafka Producer это работает:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
Однако я не могу найти, как это сделать в Spring Cloud Stream. До сих пор я пробовал это в продюсере:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Очевидно, что Cloud Cloud использует собственную стратегию именования тем с интерфейсом org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
и только одним подклассом: DefaultSubjectNamingStrategy
.
Существует ли декларативный способ настройки value.subject.name.strategy
или мы должны предоставить собственную реализацию org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
и свойство spring.cloud.stream.schema.avro.subject-naming-strategy
?