я создаю приложение spring-cloud-streams, в котором вывод имеет вид AVRO, мне удалось заставить его работать, однако мне не очень нравится это для каждого выходного ключа / значения SpecificAvroSerde, мне нужно установить schemaRegistryUrl, вызовите configure
final SpecificAvroSerde serde = new SpecificAvroSerde();
serde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl),
true)
и обработать поток с помощью .to (String topic, ProducedWith ...), и не могу использовать аннотации @SendTo("output")
springs и определение процессора потока.
В свойстве spring.cloud.stream.kafka.streams.bindings.output.producer
могут быть установлены keySerde и valueSerde, но это не имеет никакого эффекта, поскольку просто не выполняет самоконфигурирование, даже если задан schema.registry.url. Можно ли заставить его работать только с конфигурацией .yml
spring.cloud.stream:
kafka.streams:
binder:
applicationId: stream-app
brokers: localhost:9092
autoCreateTopics: true
configuration:
schema.registry:
url: http://localhost:8081
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
bindings:
input:
...
output:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
simple-data-stream-input:
contentType: application/json
destination: INPUT_TOPIC
simple-data-stream-output:
contentType: application/avro
destination: OUTPUT_TOPIC
, определение потокового процессора
public interface SimpleDataStreamProcessor {
@Input(SimpleDataStream.INPUT)
KStream<?, ?> input();
@Output(SimpleDataStream.OUTPUT)
KStream<?, ?> output();
}
и
@StreamListener("input")
@SendTo("output")
public KStream<SomeAvroKey, SomeAvroValue> process(KStream<String, String> inputStream) {
return ...;
}