Запись содержимого фрейма искры в Kafka с помощью Spark 2.2 - PullRequest
0 голосов
/ 18 марта 2019

Я обращался к документации на http://spark.apache.org/docs/2.2.2/structured-streaming-kafka-integration.html#writing-the-output-of-batch-queries-to-kafka

Это мой код.

val df = Seq(("1","One"),("2","two")).toDF("key","value")
df.printSchema()
df.show(false)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("topic", "testtopic")
  .save()

Это сообщение об ошибке, когда я выполняю этот код.

[Stage 0:>                                                          (0 + 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
    at org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
    at org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

зависимости в моем файле pom ...

 <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.2</version>
</dependency>

Кафка версия kafka_2.11-1.1.0

Ценю вашу помощь. Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...