Ошибка потоковой передачи искры: java.lang.IllegalArgumentException: java.net.URISyntaxException: Относительный путь в абсолютном URI: - PullRequest
0 голосов
/ 17 февраля 2019

Я пытаюсь транслировать данные из темы kafka avro.

Ниже приведен мой фрагмент кода:

val sparkStreamingContext = new StreamingContext(sc, Durations.seconds(60))
val brokers = "Broker info"
val schemaRegistryURL = "URL schema registry "
val subjectValueName = "topicname" + "-value"
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

val kafkaParam = new mutable.HashMap[String, String]()
kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "streaming-kafka")
kafkaParam.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "latest")
kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

val topicList = List("topicname")
val messageStream = KafkaUtils.createDirectStream(sparkStreamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicList, kafkaParam))
val TRANSACTION_SCHEMA: StructType = SchemaConverters.toSqlType(topicValueAvroSchema).dataType.asInstanceOf[StructType]
messageStream.foreachRDD { rdd =>
    val streamData = spark.read.schema(TRANSACTION_SCHEMA).avro(rdd.map(x => x.value()).toString())
    streamData.repartition(1).write.format("com.databricks.spark.avro").mode("Append") saveAsTable ("tablename")
    }
}
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

Но я получаю ниже ошибку, может кто-нибудь помочь исправить это.

java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: MapPartitionsRDD[75] at map at <console>:54
...