Я пытаюсь прочитать некоторые данные из файла с использованием структурированной потоковой передачи и, наконец, записать их в Cassandra. Однако я получаю сообщение об ошибке ниже (задолго до написания Кассандры)
"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"
Вот фрагмент кода, который я использую
val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine
val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code
Вот как класс RowToColumn выглядит как
class RowToColumn (var table: Table) extends java.io.Serializable{
val decomposer = new EventDecomposer(table)
val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
println(" in FlatMapData") // This is printed
override def call(x: Row) = {
println(" in Call method") // This is not printed
// Other code ...
}
}
Эта работа отлично работает без потоковой передачи. Кроме того, я посмотрел на другие link1 и link2 , но не решает проблему