Невозможно десериализовать сообщение avro с использованием искрового структурированного потока, где ключ сериализирован, а значение равно avro - PullRequest
0 голосов
/ 20 марта 2020

Использование Spark 2.4.0

Confluent schema-Registry для получения схемы

Сообщение Ключ сериализуется в String и Значение в Avro таким образом я пытаюсь десериализовать только значение с помощью io.confluent.kafka.serializers.KafkaAvroDeserializer, но это не работает. Может ли кто-нибудь просмотреть мой код, чтобы увидеть, что неправильно

импортированные библиотеки:

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{ Encoder, SparkSession}

Тело кода

    val topics = "test_topic"
    val spark: SparkSession = SparkSession.builder
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .config("spark.streaming.backpressure.enabled", "true")
      .config("spark.streaming.kafka.maxRatePerPartition", 2170)
      .config("spark.streaming.kafka.maxRetries", 1)
      .config("spark.streaming.kafka.consumer.poll.ms", "600000")
      .appName("SparkStructuredStreamAvro")
      .config("spark.sql.streaming.checkpointLocation", "/tmp/new_checkpoint/")
      .enableHiveSupport()
      .getOrCreate


    //add settings for schema registry url, used to get deser
    val schemaRegUrl = "http://xx.xx.xx.xxx:xxxx"
    val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)

    //subscribe to kafka
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "xx.xx.xxxx")
      .option("subscribe", "test.topic")
      .option("kafka.startingOffsets", "latest")
      .option("group.id", "use_a_separate_group_id_for_each_stream")
      .load()

    //add confluent kafka avro deserializer, needed to read messages appropriately
    val deser = new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]]

    //needed to convert column select into Array[Bytes]
    import spark.implicits._

    val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
      //read the raw bytes from spark and then use the confluent deserializer to get the record back

      val decoded = deser.deserialize(topics, rawBytes)
      val recordId = decoded.get("nameId").asInstanceOf[org.apache.avro.util.Utf8].toString
      recordId
    }


    results.writeStream
      .outputMode("append")
      .format("text")
      .option("path", "/tmp/path_new/")
      .option("truncate", "false")
      .start()
      .awaitTermination()
    spark.stop()

Не удалось десериализоваться, и полученная ошибка равна

Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDeserializer
Serialization stack:
        - object not serializable (class: io.confluent.kafka.serializers.KafkaAvroDeserializer, value: io.confluent.kafka.serializers.KafkaAvroDeserializer@591024db)
        - field (class: ca.bell.wireless.ingest$$anonfun$1, name: deser$1, type: interface org.apache.kafka.common.serialization.Deserializer)
        - object (class ca.bell.wireless.ingest$$anonfun$1, <function1>)
        - element of array (index: 1)

Он прекрасно работает, когда я пишу обычному потребителю кафки (не через искру), используя

    props.put("key.deserializer", classOf[StringDeserializer])
    props.put("value.deserializer", classOf[KafkaAvroDeserializer])

1 Ответ

0 голосов
/ 21 марта 2020

Вы определили переменную ('deser') для KafkaAvroDeserializer вне блока карты. это делает это исключение.

Попробуйте изменить код следующим образом:

val brdDeser = spark.sparkContext.broadcast(new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]])

val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
      val deser = brdDeser.value
      val decoded = deser.deserialize(topics, rawBytes)
      val recordId = decoded.get("nameId").asInstanceOf[org.apache.avro.util.Utf8].toString
      recordId
    }
...