Хорошо, так что в конце концов, сделайте это. Я думаю, что это не лучший способ сделать это, но это работает
import com.mongodb.client.MongoCollection
import com.mongodb.spark._
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.typesafe.scalalogging.slf4j.LazyLogging
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document
object Test extends App with LazyLogging {
val conf = new SparkConf()
.setAppName("test")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/",
"database"
-> "test", "collection" -> "customers"))
val readConfig = ReadConfig(Map( "uri" -> "mongodb://127.0.0.1/",
"database"
-> "test", "collection" -> "customers"))
val rdd = sc.loadFromMongoDB(readConfig)
rdd.map(document => {
val mongoDatabaseConnectionDetails =
document.get("mongoDatabaseConnectionDetails").asInstanceOf[Document]
mongoDatabaseConnectionDetails.replace("street", "azerty")
document.replace("mongoDatabaseConnectionDetails",
mongoDatabaseConnectionDetails)
save(document, writeConfig)
}).collect()
def save(document: Document, writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
mongoConnector.withCollectionDo(writeConfig, { collection:
MongoCollection[Document] => {
val searchDocument = new Document()
searchDocument.append("_id", document.get("_id").asInstanceOf[String])
collection.replaceOne(searchDocument, document)
}
})
}
}