Как преобразовать CollectionAccumulator [(Double, Double)] в SparkdataFrame? - PullRequest
0 голосов
/ 13 июня 2018

У меня есть org.apache.spark.util.CollectionAccumulator[(Double, Double)] И я добавил в него строки во время потоковой передачи.

Теперь я хочу преобразовать его в DataFrame для дальнейшей обработки.Но я не уверен, как этого добиться.

Редактировать

Добавление фрагмента кода для заполнения аккумулятора:

val strmquery = dataFramedummy.writeStream.foreach(new ForeachWriter[Row]() {

  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(row: Row): Unit = {
    println(s">> Processing ${row}")
    accumulator.add((row.getAs("Field1").asInstanceOf[Double], row.getAs("Filed2").asInstanceOf[Double]))
  }

  override def close(errorOrNull: Throwable): Unit = {
    // do nothing
  }
}).outputMode("append").start()

1 Ответ

0 голосов
/ 13 июня 2018

Преобразуйте свой аккумулятор в список и затем создайте из него набор данных.

val accumulator :org.apache.spark.util.CollectionAccumulator[(Double, Double)] = ???
spark.createDataset(accumulator.value)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...