Я пытаюсь выполнить следующий код:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, false) ::
StructField("name", StringType, true) ::
StructField("city", StringType, true) :: Nil)
case class Person(id: Long, name: String, city: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema
val people = spark.readStream.schema(schema).csv("/data/pncdw/scratch/test/*.csv").as[Person]
val population = people.groupBy('city).agg(count('city) as "population")
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val populationStream = population.
writeStream.
format("console").
trigger(Trigger.ProcessingTime(30.seconds)).
outputMode(OutputMode.Complete).
queryName("textStream").start
, но я получаю эту ошибку:
ERROR : scala> populationStream.trigger(Trigger.once)
<console>:43: error: not found: value Trigger
populationStream.trigger(Trigger.once)
Я загрузил все банки и зависимости, как показано ниже.
spark-shell --jars "/opt/projects/user/PLT8139/testjars/spark-xml_2.11-0.4.1.jar","/opt/projects/user/PLT8139/testjars/spark-avro_2.11.4.0.0.jar","/opt/projects/user/PLT8139/testjars/spark-sql-kafka-0-10_2.11-2.1.1.jar","/opt/projects/user/PLT8139/testjars/spark-streaming-kafka-0-10_2.11-
2.1.1.jar","/opt/projects/user/PLT8139/testjars/spark-streaming-kafka-0-10-assembly_2.11-2.1.1.jar","/opt/projects/user/PLT8139/testjars/spark-streaming_2.11-2.1.1.jar",,"/opt/projects/user/PLT8139/testjars/spark-sql_2.11-2.1.1.jar" --driver-memory 3g --executor-memory 3g --num-executors 4 --executor-cores 2