Spark Структурированный поток, обогащенный Кассандрой - PullRequest
0 голосов
/ 16 ноября 2018

Я передаю данные из Кафки со структурированным потоком

  val df = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("enable.auto.commit", false)
    .option("auto.offset.reset", "earliest")
    .option("group.id", UUID.randomUUID().toString)
    .option("subscribe", "test")
    .load()

, а затем попытайтесь присоединиться к нему за столом Кассандры

val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))
    .rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
    .toDS()
    .writeStream
    .format("console") // <-- use ConsoleSink
    .option("truncate", false)
    .option("numRows", 10)
    .trigger(Trigger.ProcessingTime(5 seconds))
    .queryName("rate-console")
    .start
    .awaitTermination()

но я получаю, когда я пытаюсь преобразовать фрейм данных в rdd, есть идеи, почему?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

1 Ответ

0 голосов
/ 17 ноября 2018

Как говорится в сообщении об ошибке, запуск должен быть вызван круглыми скобками, как показано ниже:

 val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))
    .rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
    .toDS()
    .writeStream
    .format("console") // <-- use ConsoleSink
    .option("truncate", false)
    .option("numRows", 10)
    .trigger(Trigger.ProcessingTime(5 seconds))
    .queryName("rate-console")
    .start()
    .awaitTermination()
...