Kafka Streams: сериализация обратно в авро - PullRequest
0 голосов
/ 04 апреля 2020

Я пытаюсь создать поток, который получает Avro Topi c, выполнить простое преобразование, а затем отправить его обратно в формате Avro в другой Topi c, и я как бы застрял в финале Часть сериализации.

У меня создана схема AVRO, я импортирую ее и использую для создания Specifi c Avro Serde. Но я не знаю, как сериализовать объект mov ie обратно в AVRO, используя этот класс.

Это класс потока:

class StreamsProcessor(val brokers: String, val schemaRegistryUrl: String) {

    private val logger = LogManager.getLogger(javaClass)

    fun process() {
        val streamsBuilder = StreamsBuilder()

        val avroSerde = GenericAvroSerde().apply {
            configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
        }

        val movieAvro = SpecificAvroSerde<Movie>().apply{
            configure(mapOf(Pair("schema.registry.url", schemaRegistryUrl)), false)
        }

        val movieAvroStream: KStream<String, GenericRecord> = streamsBuilder
                .stream(movieAvroTopic, Consumed.with(Serdes.String(), avroSerde))

        val movieStream: KStream<String, StreamMovie> = movieAvroStream.map {_, movieAvro ->
            val movie = StreamMovie(
                    movieId = movieAvro["name"].toString() + movieAvro["year"].toString(),
                    director = movieAvro["director"].toString(),
            )
             KeyValue("${movie.movieId}", movie)
        }

        // This where I'm stuck, the call is wrong because movieStream is not a <String, movieAvro> object 
        movieStream.to(movieTopic, Produced.with(Serdes.String(), movieAvro))

        val topology = streamsBuilder.build()

        val props = Properties()
        props["bootstrap.servers"] = brokers
        props["application.id"] = "movies-stream"
        val streams = KafkaStreams(topology, props)
        streams.start()
    }
}

Спасибо

1 Ответ

1 голос
/ 08 апреля 2020

Тип вашего потока результатов - KStream<String, StreamMovie>, и поэтому используемое значение Serde должно иметь тип SpecificAvroSerde<StreamMovie>.

Почему вы пытаетесь использовать SpecificAvroSerde<Movie>? Если Movie является желаемым типом вывода, вы должны создать Movie объект на вашем шаге map вместо StreamMovie объекта и соответственно изменить тип значения результата KStream.

Сравнить https://github.com/confluentinc/kafka-streams-examples/blob/5.4.1-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

...