Вы можете создать MongoDbSink
:
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
class MongoDbSink(options: Map[String, String]) extends Sink with Logging {
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val schema = data.schema
val rdd = data.queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
// write RDD to MongoDB!!
}
}
class MongoDbSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new MongoDbSink(parameters)
}
def shortName(): String = "my-mongo-sink"
}
И затем реализовать запись в MongoDb, как вам нравится.
В .format()
writeStream укажите путь к MongoDbSinkProvider