Выполнить модель на основе Python в программе Scala на основе структурированного потокового воспроизведения. - PullRequest
0 голосов
/ 22 января 2019

У меня есть программа структурированного потокового вещания на основе scala, которая должна выполнять модель на основе Python.

В предыдущей версии spark (1.6.x) я делал это путем преобразования DStream в RDD, а затем вызывал метод rdd.pipe.

Однако этот подход не работает для структурированной потоковой передачи. Выдает следующую ошибку:

Запросы с потоковыми источниками должны выполняться с помощью writeStream.start ()

Фрагмент кода выглядит следующим образом:

val sourceDF = spark.readStream.option("header","true").schema(schema).csv("/Users/user/Desktop/spark_tutorial/")
val rdd: RDD[String] = sourceDF.rdd.map(row => row.mkString(","))
val pipedRDD: RDD[String] = rdd.pipe("/Users/user/Desktop/test.py")

import org.apache.spark.sql._
val rowRDD : RDD[Row] = pipedRDD.map(row => Row.fromSeq(row.split(",")))


val newSchema = <code to create new schema>

val newDF = spark.createDataFrame(rowRDD, newSchema)
val query = newDF.writeStream.format("console").outputMode(OutputMode.Append()).start
query.awaitTermination()

Трассировка стека исключений:

19/01/22 00:10:00 INFO StateStoreCoordinatorRef: Зарегистрированная конечная точка StateStoreCoordinator Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Запросы с потоковыми источниками должны выполняться с помощью writeStream.start () ;; FileSource [/ Users / пользователя / Desktop / spark_tutorial /] at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .org $ apache $ spark $ sql $ катализатор $ анализ $ UnsupportedOperationChecker $$ throwError (UnsupportedOperationChecker.scala: 374) в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala: 37) в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ checkForBatch $ 1.apply (UnsupportedOperationChecker.scala: 35) в org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala: 127) в org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ foreachUp $ 1.apply (TreeNode.scala: 126) в org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ foreachUp $ 1.apply (TreeNode.scala: 126) в scala.collection.immutable.List.foreach (List.scala: 392) в org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp (TreeNode.scala: 126) в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ .checkForBatch (UnsupportedOperationChecker.scala: 35) в org.apache.spark.sql.execution.QueryExecution.assertSupported (QueryExecution.scala: 51) в org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute (QueryExecution.scala: 62) в org.apache.spark.sql.execution.QueryExecution.withCachedData (QueryExecution.scala: 60) в org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (QueryExecution.scala: 66) в org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 66) в org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution.scala: 72) в org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 68) в org.apache.spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 77) в org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryExecution.scala: 77) в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (QueryExecution.scala: 80) в org.apache.spark.sql.execution.QueryExecution.toRdd (QueryExecution.scala: 80) в org.apache.spark.sql.Dataset.rdd $ lzycompute (Dataset.scala: 2975) в org.apache.spark.sql.Dataset.rdd (Dataset.scala: 2973) на тесте $ .main (Test.scala: 20) в Test.main (Test.scala)

Есть предложения?

...