Я пытаюсь прочитать тему Avro, схему, описанную рядом с реестром схемы. Невозможно десериализовать и записать тему Avro в строковом формате в другую тему кафки ни обычными методами, ни последними методами from_avro.код, как показано ниже.Пожалуйста, поделитесь своим опытом.
Program
def main(args: Array[String]) {
val spark =
SparkSession.builder().appName("KafkaAvroReader")
.master("local[2]").getOrCreate()
import spark.implicits._
val envConf = ConfigFactory.load.getConfig(args(0))
val consumer_grp = args(2)
val topics = args(1)
val kafkaParams = Map[String, String](
"kafka.bootstrap.servers" -> envConf.getString("bootstrap.server"),
"key.deserializer" -> "KafkaAvroDeserializer",
"value.deserializer" -> "KafkaAvroDeserializer",
ConsumerConfig.GROUP_ID_CONFIG -> "consumer_grp",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
"failOnDataLoss" -> "false",
"schema.registry.url" -> envConf.getString("schemaRegistryURL"))
val subjectValueName = topics + "-value"
val avro_schema = new RestService(envConf.getString("schemaRegistryURL")).getLatestVersion(subjectValueName)
val messageSchema = new Schema.Parser().parse(avro_schema.getSchema).toString
val kafkaAVroTopic_Df =
spark.readStream.format("Kafka").option("subscribe",
topics).options(kafkaParams).load()
kafkaAVroTopic_Df.printSchema
val output = kafkaAVroTopic_Df.select(from_avro('value,
messageSchema) as 'abc)
output.printSchema
output.
writeStream.format("kafka").option("kafka.bootstrap.servers",
envConf.getString("bootstrap.server")).
option("topic", "spark6Stream").option("checkpointLocation",
"/user/cloudera/spkcheckpnt31").start()
Depedencies
<properties>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
<kafka.version>1.1.1</kafka.version>
<confluent.version>5.0.0</confluent.version>
<kafkaclients.version>1.0.1</kafkaclients.version>
</properties>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-
streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --
>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-
streaming-kafka-0-10-assembly -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-
clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafkaclients.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- packages org.apache.spark:spark-avro_2.11:2.4.0 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>2.4.0</version>
</dependency>
<!-- packages org.apache.spark:spark-avro_2.11:2.4.0 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>4.0.0</version>
Ошибка: скриншот также прикреплен. ОШИБКА Запрос [id = cd559067-939b-47f1-9cab-8268873fb210, runId = 22110ea8-858b-44c6-83c6-75ec7c77fcba] завершен с ошибкой(org.apache.spark.sql.execution.streaming.MicroBatchExecution: 91) org.apache.spark.sql.AnalysisException: обязательный атрибут 'value' не найден;в org.apache.spark.sql.kafka010.KafkaWriter $$ anonfun $ 6.apply (KafkaWriter.scala: 71) в org.apache.spark.sql.kafka010.KafkaWriter $$ anonfun $ 6.apply (KafkaWriter.scala: 71)в scala.Option.getOrElse (Option.scala: 121) в org.apache.spark.sql.kafka010.KafkaWriter $ .validateQuery (KafkaWriter.scala: 70) в org.apache.spark.sql.kafka010.KafkaSourceProvideercateKafkaSourceProvider.scala: 285) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org $ apache $ spark $ sql $ исполнительный $ потоковый $ MicroBatchExecution $$ runBatch (MicroBatchExecution.scala: 502) введите описание изображенияздесь at org.apache.spark.sql.execution.streaming.MicroBatchExecution $$ anonfun $ runActivationStream $ 1 $$ anonfun $ apply $ mcZ $ sp $ 1.apply $ mcV $ sp (MicroBatchExecution.scala: 198) в орг.apache.spark.sql.execution.streaming.MicroBatchExecution $$ anonfun $ runActivationStream $ 1 $$ anonfun $ apply $ mcZ $ sp $ 1.apply (MicroBatchExecution.scala: 166) в org.apache.spark.sql.execution.streaming.MicroBatchExecution$$ anonfun $ runActivatedStream $ 1 $$ anonfun $ apply $ mcZ $ sp $ 1.apply (MicroBatchExecution.scala: 166) в org.apache.spark.sql.execution.streaming.ProgressReporter $ class.reportTimeTaken (ProgressReporter.scala: 351) в org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 58) в org.apache.spark.sql.execution.streaming.MicroBatchExecution $$ anonfun $ runActivationStream $ 1.apply $ mcZ $ sp (MicroBatchExecu.schere.166) в org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute (TriggerExecutor.scala: 56) в org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivationStream (MicroBatchExecution.scala: 160 в.apache.spark.sql.execution.streaming.StreamExecution.org $ apache $ spark $ sql $ исполнение $ streaming $ StreamExecution $$ runStream (StreamExecution.scala: 279) в org.apache.spark.sql.execution.streaming.StreamExecution$$ anon $ 1.run (StreamExecution.scala: 189) Исключение в потоке "main" org.apache.spark.sql.streaming.StreamingQueryException: обязательный атрибут value 'not found