искры сохраняются документ после его изменения - PullRequest
0 голосов
/ 25 апреля 2018

Я очень новичок в Spark и не могу найти способ сохранить измененный документ в базе данных:

import com.mongodb.spark._
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.typesafe.scalalogging.slf4j.LazyLogging
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document


object Test extends App with LazyLogging {

val conf = new SparkConf()
.setAppName("test")
.setMaster("local[*]")

val sc = new SparkContext(conf)
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "customers"))
val rdd = sc.loadFromMongoDB(readConfig) //.toDF()


val logs = rdd.foreach {
  document => {
    val mongoDatabaseConnectionDetails = 
    document.get("address").asInstanceOf[Document]
    mongoDatabaseConnectionDetails.put("street", "azerty")
  }
}


val writeConfig = WriteConfig(
  Map(
    "uri" -> "mongodb://127.0.0.1/",
    "database" -> "test",
    "collection" -> "customers",
    "writeConcern.w" -> "majority"
  ))

// ??

}

После изменения документа я бы хотел заменить его в базе данных.

Заранее спасибо

1 Ответ

0 голосов
/ 26 апреля 2018

Хорошо, так что в конце концов, сделайте это. Я думаю, что это не лучший способ сделать это, но это работает

import com.mongodb.client.MongoCollection
import com.mongodb.spark._
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.typesafe.scalalogging.slf4j.LazyLogging
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document


object Test extends App with LazyLogging {

  val conf = new SparkConf()
    .setAppName("test")
    .setMaster("local[*]")

  val sc = new SparkContext(conf)

  val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/", 
    "database" 
    -> "test", "collection" -> "customers"))

  val readConfig =  ReadConfig(Map( "uri" -> "mongodb://127.0.0.1/", 
    "database" 
    -> "test", "collection" -> "customers"))

  val rdd = sc.loadFromMongoDB(readConfig)
  rdd.map(document => {
    val mongoDatabaseConnectionDetails = 
      document.get("mongoDatabaseConnectionDetails").asInstanceOf[Document]
    mongoDatabaseConnectionDetails.replace("street", "azerty")
    document.replace("mongoDatabaseConnectionDetails", 
      mongoDatabaseConnectionDetails)

    save(document, writeConfig)
  }).collect()


  def save(document: Document, writeConfig: WriteConfig): Unit = {
    val mongoConnector = MongoConnector(writeConfig.asOptions)
    mongoConnector.withCollectionDo(writeConfig, { collection: 
      MongoCollection[Document] => {
    val searchDocument = new Document()
    searchDocument.append("_id", document.get("_id").asInstanceOf[String])
    collection.replaceOne(searchDocument, document)
  }

})
}
}
...