Сохраните потоковый фрейм данных в MongoDB, используя Spark Scala - PullRequest
0 голосов
/ 25 сентября 2019

Я использую Kakfa и Spark, мой выход (df1) - потоковый Dataframe, я бы хотел сохранить его в MongoDB.Какие-либо предложения?Большое спасибо!

  val df= lines.selectExpr("CAST(value AS STRING)").as[(String)]

  .select(from_json($"value", DFschema).as("data"))
  .select("data.*")
  .writeStream
  .format("console")
  .option("truncate", "false")
  .start()
 .awaitTermination()

  df1 = df.filter($"COLUMN".isin(listA: _*))

  // save df1 into MongoDB
   //MongoSpark.save()...

1 Ответ

0 голосов
/ 26 сентября 2019

Вот несколько способов взаимодействия с mongodb с помощью spark

val mongodb_input_uri = "mongodb://" + interface + ":" + port + "/" + database + "." + collection
    val mongodb_output_uri = "mongodb://" + interface + ":" + port + "/" + database + "." + collection

    val sparkSession = org.apache.spark.sql.SparkSession.builder
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", mongodb_input_uri)
      .config("spark.mongodb.output.uri", mongodb_output_uri)
.getOrCreate()


def writeData(sparkSession: SparkSession, dataframe : DataFrame)= {
    dataframe.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
  }

  def readData(sparkSession: SparkSession): DataFrame = {
    val data = sparkSession.read.format("com.mongodb.spark.sql.DefaultSource").load()

    data

}

Источник: github_source_code

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...