Структурированная потоковая передача: запросы с потоковыми источниками должны выполняться с помощью writeStream.start () - PullRequest
0 голосов
/ 31 марта 2020

Я пытаюсь прочитать некоторые данные из файла с использованием структурированной потоковой передачи и, наконец, записать их в Cassandra. Однако я получаю сообщение об ошибке ниже (задолго до написания Кассандры)

"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"

Вот фрагмент кода, который я использую

val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine

val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code

Вот как класс RowToColumn выглядит как

class RowToColumn (var table: Table) extends java.io.Serializable{
  val decomposer = new EventDecomposer(table)

  val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
  //val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
    println(" in FlatMapData") // This is printed
    override def call(x: Row) = {
      println(" in Call method") // This is not printed
      // Other code ...
  }

}

Эта работа отлично работает без потоковой передачи. Кроме того, я посмотрел на другие link1 и link2 , но не решает проблему

1 Ответ

1 голос
/ 31 марта 2020

Вы можете подходить к части записи следующим образом, так как я не знаю, есть ли у Cassandra соединитель потоковой передачи для структурированной потоковой передачи в искре:

ip15M
      .writeStream
      .foreachBatch { (df, batchId) => {
        // here apply all of your logic on dataframe
      }
      }
      .start()

Имейте в виду, что в foreach l oop вы имеете дело с dataframe, а не с потоками, и, скорее всего, вы можете сохранить их непосредственно в Cassandra.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...