потоковая искра 2.3.0 |Читайте Авро Кафка Рубрика |Обязательный атрибут 'value' не найден; - PullRequest
0 голосов
/ 27 февраля 2019

Я пытаюсь прочитать тему Avro, схему, описанную рядом с реестром схемы. Невозможно десериализовать и записать тему Avro в строковом формате в другую тему кафки ни обычными методами, ни последними методами from_avro.код, как показано ниже.Пожалуйста, поделитесь своим опытом.

Program
def main(args: Array[String]) {
 val spark = 
 SparkSession.builder().appName("KafkaAvroReader")
 .master("local[2]").getOrCreate()

import spark.implicits._

val envConf = ConfigFactory.load.getConfig(args(0))

val consumer_grp = args(2)
val topics = args(1)

   val kafkaParams = Map[String, String](
  "kafka.bootstrap.servers" -> envConf.getString("bootstrap.server"),
  "key.deserializer" -> "KafkaAvroDeserializer",
  "value.deserializer" -> "KafkaAvroDeserializer",
  ConsumerConfig.GROUP_ID_CONFIG -> "consumer_grp",
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
  "failOnDataLoss" -> "false",
  "schema.registry.url" -> envConf.getString("schemaRegistryURL"))

val subjectValueName = topics + "-value"

 val avro_schema = new RestService(envConf.getString("schemaRegistryURL")).getLatestVersion(subjectValueName)
val messageSchema = new Schema.Parser().parse(avro_schema.getSchema).toString
val kafkaAVroTopic_Df = 
spark.readStream.format("Kafka").option("subscribe", 
topics).options(kafkaParams).load()
kafkaAVroTopic_Df.printSchema
val output = kafkaAVroTopic_Df.select(from_avro('value, 
messageSchema) as 'abc)
output.printSchema
output.
writeStream.format("kafka").option("kafka.bootstrap.servers", 
envConf.getString("bootstrap.server")).
option("topic", "spark6Stream").option("checkpointLocation", 
"/user/cloudera/spkcheckpnt31").start()
  Depedencies
  <properties>
  <scala.version>2.11</scala.version>
 <spark.version>2.4.0</spark.version>
 <kafka.version>1.1.1</kafka.version>
 <confluent.version>5.0.0</confluent.version>
 <kafkaclients.version>1.0.1</kafkaclients.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>io.confluent</groupId>
 <artifactId>kafka-streams-avro-serde</artifactId>
 <version>${confluent.version}</version>
 </dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>

 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark- 
 streaming -->
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-streaming_${scala.version}</artifactId>
 <version>${spark.version}</version>
 <scope>provided</scope>
 </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -- 
>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>

 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark- 
 streaming-kafka-0-10-assembly -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka_${scala.version}</artifactId>
 <version>${kafka.version}</version>
 </dependency>
 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka- 
 clients -->
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>${kafkaclients.version}</version>
 </dependency>
 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
  </dependency>

   <!-- packages org.apache.spark:spark-avro_2.11:2.4.0 -->
   <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-avro_${scala.version}</artifactId>
   <version>2.4.0</version>

   </dependency>
   <!-- packages org.apache.spark:spark-avro_2.11:2.4.0 -->
   <dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-avro_${scala.version}</artifactId>
   <version>4.0.0</version>

Ошибка: скриншот также прикреплен. ОШИБКА Запрос [id = cd559067-939b-47f1-9cab-8268873fb210, runId = 22110ea8-858b-44c6-83c6-75ec7c77fcba] завершен с ошибкой(org.apache.spark.sql.execution.streaming.MicroBatchExecution: 91) org.apache.spark.sql.AnalysisException: обязательный атрибут 'value' не найден;в org.apache.spark.sql.kafka010.KafkaWriter $$ anonfun $ 6.apply (KafkaWriter.scala: 71) в org.apache.spark.sql.kafka010.KafkaWriter $$ anonfun $ 6.apply (KafkaWriter.scala: 71)в scala.Option.getOrElse (Option.scala: 121) в org.apache.spark.sql.kafka010.KafkaWriter $ .validateQuery (KafkaWriter.scala: 70) в org.apache.spark.sql.kafka010.KafkaSourceProvideercateKafkaSourceProvider.scala: 285) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org $ apache $ spark $ sql $ исполнительный $ потоковый $ MicroBatchExecution $$ runBatch (MicroBatchExecution.scala: 502) введите описание изображенияздесь at org.apache.spark.sql.execution.streaming.MicroBatchExecution $$ anonfun $ runActivationStream $ 1 $$ anonfun $ apply $ mcZ $ sp $ 1.apply $ mcV $ sp (MicroBatchExecution.scala: 198) в орг.apache.spark.sql.execution.streaming.MicroBatchExecution $$ anonfun $ runActivationStream $ 1 $$ anonfun $ apply $ mcZ $ sp $ 1.apply (MicroBatchExecution.scala: 166) в org.apache.spark.sql.execution.streaming.MicroBatchExecution$$ anonfun $ runActivatedStream $ 1 $$ anonfun $ apply $ mcZ $ sp $ 1.apply (MicroBatchExecution.scala: 166) в org.apache.spark.sql.execution.streaming.ProgressReporter $ class.reportTimeTaken (ProgressReporter.scala: 351) в org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 58) в org.apache.spark.sql.execution.streaming.MicroBatchExecution $$ anonfun $ runActivationStream $ 1.apply $ mcZ $ sp (MicroBatchExecu.schere.166) в org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute (TriggerExecutor.scala: 56) в org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivationStream (MicroBatchExecution.scala: 160 в.apache.spark.sql.execution.streaming.StreamExecution.org $ apache $ spark $ sql $ исполнение $ streaming $ StreamExecution $$ runStream (StreamExecution.scala: 279) в org.apache.spark.sql.execution.streaming.StreamExecution$$ anon $ 1.run (StreamExecution.scala: 189) Исключение в потоке "main" org.apache.spark.sql.streaming.StreamingQueryException: обязательный атрибут value 'not found

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...