Можно ли определить выходной связыватель пружинных потоков в .yml с помощью avro? - PullRequest
0 голосов
/ 17 апреля 2019

я создаю приложение spring-cloud-streams, в котором вывод имеет вид AVRO, мне удалось заставить его работать, однако мне не очень нравится это для каждого выходного ключа / значения SpecificAvroSerde, мне нужно установить schemaRegistryUrl, вызовите configure

final SpecificAvroSerde serde = new SpecificAvroSerde();
serde.configure(
  Collections.singletonMap(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl),
    true)

и обработать поток с помощью .to (String topic, ProducedWith ...), и не могу использовать аннотации @SendTo("output") springs и определение процессора потока. В свойстве spring.cloud.stream.kafka.streams.bindings.output.producer могут быть установлены keySerde и valueSerde, но это не имеет никакого эффекта, поскольку просто не выполняет самоконфигурирование, даже если задан schema.registry.url. Можно ли заставить его работать только с конфигурацией .yml

spring.cloud.stream:
  kafka.streams:
    binder:
      applicationId: stream-app
      brokers: localhost:9092
      autoCreateTopics: true
      configuration:
        schema.registry:
          url: http://localhost:8081
        commit.interval.ms: 1000
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
    bindings:
      input:
        ...

      output:
        producer:
          keySerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

  bindings:
    simple-data-stream-input:
      contentType: application/json
      destination: INPUT_TOPIC
    simple-data-stream-output:
      contentType: application/avro
      destination: OUTPUT_TOPIC

, определение потокового процессора

public interface SimpleDataStreamProcessor {
  @Input(SimpleDataStream.INPUT)
  KStream<?, ?> input();

  @Output(SimpleDataStream.OUTPUT)
  KStream<?, ?> output();
}

и

@StreamListener("input")
@SendTo("output")
public KStream<SomeAvroKey, SomeAvroValue> process(KStream<String, String> inputStream) {
   return ...;
}

...