Написать потоковый Dataframe Spark в MongoDB - PullRequest
0 голосов
/ 11 июня 2018

У меня есть потоковая Dataset в Spark с определенной схемой.Когда я хочу вычислить запрос по нему, я вызываю:

StreamingQuery query = querydf
                      .writeStream()
                      .outputMode(OutputMode.Update())
                      .format("console")
                      .start();           

query.awaitTermination();

Таким образом, я вижу в консоли результат запроса для каждого триггера.Как я могу написать результат DataFrame в Mongo?Для Straming Dataset невозможно.Должен ли я преобразовать потоковый Dataset в статический Dataset каждый триггер и затем сохранить его?Как я могу это сделать?

1 Ответ

0 голосов
/ 13 июня 2018

Вы можете создать MongoDbSink:

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

class MongoDbSink(options: Map[String, String]) extends Sink with Logging {

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    val schema = data.schema
    val rdd = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }

    // write RDD to MongoDB!!
  }
}

class MongoDbSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(sqlContext: SQLContext,
                 parameters: Map[String, String],
                 partitionColumns: Seq[String],
                 outputMode: OutputMode): Sink = {
    new MongoDbSink(parameters)
  }

  def shortName(): String = "my-mongo-sink"
}

И затем реализовать запись в MongoDb, как вам нравится.

В .format() writeStream укажите путь к MongoDbSinkProvider

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...