Как мы можем настроить value.subject.name.strategy для схем в Spring Cloud Stream Kafka производителей, потребителей и KStreams? - PullRequest
1 голос
/ 24 апреля 2019

Я бы хотел настроить стратегию именования субъектов схемы Avro в источниках, потребителях и KStreams Spring Cloud Stream.

Это будет сделано в Кафке со свойствами key.subject.name.strategy и value.subject.name.strategy -> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy

В родном Kafka Producer это работает:


private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }

Однако я не могу найти, как это сделать в Spring Cloud Stream. До сих пор я пробовал это в продюсере:

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Очевидно, что Cloud Cloud использует собственную стратегию именования тем с интерфейсом org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy и только одним подклассом: DefaultSubjectNamingStrategy.

Существует ли декларативный способ настройки value.subject.name.strategy или мы должны предоставить собственную реализацию org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy и свойство spring.cloud.stream.schema.avro.subject-naming-strategy?

Ответы [ 2 ]

1 голос
/ 25 апреля 2019

Вы можете объявить его в своих свойствах как

spring.cloud.stream.schema.avro.subjectNamingStrategy=MyStrategy

, где MyStrategy является реализацией интерфейса.Например

object MyStrategy: SubjectNamingStrategy {
   override fun toSubject(schema: Schema): String = schema.fullName
}
0 голосов
/ 09 июля 2019

Как указано в другом ответе, есть выделенное свойство , spring.cloud.stream.schema.avro.subjectNamingStrategy, которое позволяет установить другую стратегию именования для производителей Kafka .

Я предоставил org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy, который предоставляет эту функциональность из коробки.

В случае Kafka Streams и собственной сериализации / десериализации (поведение по умолчанию из Spring Cloud Streams 3.0.0+) необходимо использовать реализацию Confluent (io.confluent.kafka.serializers.subject.RecordNameStrategy) и собственные свойства:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      ...
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              ...
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...