Разъем Spark Mongo: установка только одного атрибута в подключении MongoDB - PullRequest
0 голосов
/ 30 мая 2018

Допустим, у меня есть следующий документ Mongo:

{
 "_id":1, 
 "age": 10,
 "foo": 20
}

и следующий Spark DataFrame df:

_id | val
 1  | 'a'
 2  | 'b'

, и теперь я хочу добавить val изфрейм данных для документа Mongo ...

Используя соединитель MongoDB Spark , я могу использовать логику по умолчанию *1014* через добавление "_id", что означает, что "_id "в совпадениях между информационным фреймом Spark и документом Mongo, соединитель Mongo не будет создавать новый документ, а будет обновлять старый.

Но!Обновление в основном ведет себя как replace - если я сделаю следующее:

df
.write.format("com.mongodb.spark.sql.DefaultSource")
.mode("append")
.option('spark.mongodb.output.uri','mongodb://mongo_server:27017/testdb.test_collection')
.save()

Коллекция будет выглядеть так:

[   
    {
     "_id":1, 
     "val": 'a'
    },
   {
     "_id":2, 
     "val':'b' 
    }
]

, и я хотел бы получить это:

[   
    {
     "_id":1, 
     "age": 10,
     "foo": 20
     "val": 'a'
    },
   {
     "_id":2, 
     "val':'b' 
    }
]

Мои вопросы :

  • Есть ли способ (некоторая опция), чтобы заставить разъем Spark вести себя так,Я хочу это вести себя?

  • Конечно, я могу сначала прочитать документы из Mongo в Spark, обогатить их атрибутом «val» и записать / добавить обратно в Mongo.Что такое ввод / вывод этой операции?Это полная загрузка (чтение всех документов и затем замена всех атрибутов) или это несколько умно (например, чтение всех документов, но добавление только атрибута «val», а не замена всего документа)?

1 Ответ

0 голосов
/ 04 июня 2018

есть ли способ (какая-либо опция), чтобы заставить разъем Spark вести себя так, как я хочу?

Да, вы можете установить replaceDocument на false.Например, используя соединитель MongoDB для Spark v2.2.2 и Apache Spark v2.3 в Python:

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
               .option("spark.mongodb.input.uri", "mongodb://host101:27017/dbName.collName").load()
df.first() 
> Row(_id=1.0, age=10.0, foo=20.0)

# Create a dataframe 
d = {'_id': [1, 2], 'val':['a', 'b']}
inputdf = pd.DataFrame(data=d) 
sparkdf = sqlContext.createDataFrame(inputdf)

# Write to Spark 
sparkdf.write.format("com.mongodb.spark.sql.DefaultSource")
             .mode("append").option("spark.mongodb.output.uri", "mongodb://host101:27017/dbName.collName")
             .option("replaceDocument", "false")
             .save()

# Result 
+---+----+----+---+
|_id| age| foo|val|
+---+----+----+---+
|1.0|10.0|20.0|  a|
|2.0|null|null|  b|
+---+----+----+---+
...