Spark структурированная ошибка потоковой передачи - PullRequest
0 голосов
/ 22 мая 2018

Я пытаюсь выполнить следующий код:

import org.apache.spark.sql.types._

val schema = StructType(
StructField("id", LongType, false) ::
StructField("name", StringType, true) ::
StructField("city", StringType, true) :: Nil)


case class Person(id: Long, name: String, city: String)

import org.apache.spark.sql.Encoders

val schema = Encoders.product[Person].schema


val people = spark.readStream.schema(schema).csv("/data/pncdw/scratch/test/*.csv").as[Person]

val population = people.groupBy('city).agg(count('city) as "population")

import scala.concurrent.duration._

import org.apache.spark.sql.streaming.{OutputMode, Trigger}

val populationStream = population.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(30.seconds)).
outputMode(OutputMode.Complete).
queryName("textStream").start

, но я получаю эту ошибку:

ERROR : scala> populationStream.trigger(Trigger.once)
<console>:43: error: not found: value Trigger
populationStream.trigger(Trigger.once)

Я загрузил все банки и зависимости, как показано ниже.

spark-shell --jars "/opt/projects/user/PLT8139/testjars/spark-xml_2.11-0.4.1.jar","/opt/projects/user/PLT8139/testjars/spark-avro_2.11.4.0.0.jar","/opt/projects/user/PLT8139/testjars/spark-sql-kafka-0-10_2.11-2.1.1.jar","/opt/projects/user/PLT8139/testjars/spark-streaming-kafka-0-10_2.11-
2.1.1.jar","/opt/projects/user/PLT8139/testjars/spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar","/opt/projects/user/PLT8139/testjars/spark-streaming_2.11-2.1.1.jar",,"/opt/projects/user/PLT8139/testjars/spark-sql_2.11-2.1.1.jar" --driver-memory 3g --executor-memory 3g --num-executors 4 --executor-cores 2

1 Ответ

0 голосов
/ 22 мая 2018

trigger является свойством DataStreamWriter.

Вы вызываете его на populationStream, StreamingQuery возвращаемом методом start.

Если вы хотите установить запрос, замените существующий:

val populationStream = population.
  writeStream.
  format("console").
  trigger(Trigger.Once).
  outputMode(OutputMode.Complete).
  queryName("textStream").start
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...