Как десериализовать сообщения Avro от Kafka в Flink (Scala)? - PullRequest
0 голосов
/ 12 марта 2019

Я читаю сообщения от Кафки в Flink Shell (Scala) следующим образом:

scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091

Здесь я использую SimpleStringSchema () в качестве десериализатора, но на самом деле сообщения имеют другую схему Avro (скажем, msg.avsc). Как создать десериализатор на основе этой другой схемы Avro (msg.avsc) для десериализации входящих сообщений Kafka?

Мне не удалось найти никаких примеров кода или учебных пособий для этого в Scala, поэтому любые входные данные могут помочь. Похоже, мне может понадобиться расширить и реализовать

org.apache.flink.streaming.util.serialization.DeserializationSchema

для декодирования сообщений, но я не знаю, как это сделать. Любые учебники или инструкции будут очень полезны. Поскольку я не хочу выполнять какую-либо пользовательскую обработку, а просто анализировать сообщения в соответствии со схемой Avro (msg.avsc), любые быстрые способы сделать это были бы очень полезны.

1 Ответ

0 голосов
/ 13 марта 2019

Я нашел пример для класса AvroDeserializationSchema в Java

https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/avro/AvroDeserializationSchema.java

Фрагмент кода:

Если вы хотите десериализовать в конкретный класс дел, используйте new FlinkKafkaConsumer011[case_class_name], new AvroDeserializationSchema[case_class_name](classOf[case_class_name]

val stream = env .addSource(new FlinkKafkaConsumer011[DeviceData]
 ("test", new AvroDeserializationSchema[case_class_name](classOf[case_class_name]), properties))

Если вы используете реестр схемы Confluent, то предпочтительным решением будет использование Avro serde, предоставленного Confluent. Мы просто вызываем функцию deserialize (), и разрешение последней версии схемы Avro для использования выполняется автоматически за сценой, и манипулирование байтами не требуется.

Что-то вроде ниже в скале.

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
  false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
                       topic: String, partition: Int, offset: Long): KafkaKV = {

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

    KafkaKV(key, value)
    }

...

подробное объяснение здесь: http://svend.kelesia.com/how-to-integrate-flink-with-confluents-schema-registry.html#how-to-integrate-flink-with-confluents-schema-registry

Надеюсь, это поможет!

...