Я пытаюсь создать поток, который получает Avro Topi c, выполнить простое преобразование, а затем отправить его обратно в формате Avro в другой Topi c, и я как бы застрял в финале Часть сериализации.
У меня создана схема AVRO, я импортирую ее и использую для создания Specifi c Avro Serde. Но я не знаю, как сериализовать объект mov ie обратно в AVRO, используя этот класс.
Это класс потока:
class StreamsProcessor(val brokers: String, val schemaRegistryUrl: String) {
private val logger = LogManager.getLogger(javaClass)
fun process() {
val streamsBuilder = StreamsBuilder()
val avroSerde = GenericAvroSerde().apply {
configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
}
val movieAvro = SpecificAvroSerde<Movie>().apply{
configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
}
val movieAvroStream: KStream<String, GenericRecord> = streamsBuilder
.stream(movieAvroTopic, Consumed.with(Serdes.String(), avroSerde))
val movieStream: KStream<String, StreamMovie> = movieAvroStream.map {_, movieAvro ->
val movie = StreamMovie(
movieId = movieAvro["name"].toString() + movieAvro["year"].toString(),
director = movieAvro["director"].toString(),
)
KeyValue("${movie.movieId}", movie)
}
// This where I'm stuck, the call is wrong because movieStream is not a <String, movieAvro> object
movieStream.to(movieTopic, Produced.with(Serdes.String(), movieAvro))
val topology = streamsBuilder.build()
val props = Properties()
props["bootstrap.servers"] = brokers
props["application.id"] = "movies-stream"
val streams = KafkaStreams(topology, props)
streams.start()
}
}
Спасибо