Вставьте несколько строк из pyspark в cosmosdb - PullRequest
0 голосов
/ 03 сентября 2018

Я пытаюсь вставить более одной строки в фрейм данных в pyspark. Это мой код:

Сначала я импортирую пакеты:

import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents

Затем я определяю соединение Policy:

connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = {"Western Europe"}

Полномочия:

masterKey = 'yourmasterkey'
host = 'https://testcosmosdbasdada.documents.azure.com:443/'
client = document_client.DocumentClient(host,{'masterKey': masterKey}, connectionPolicy)

Затем я определяю имя базы данных и коллекции:

databaseId = 'pruebadb'
collectionId = 'collection1'

dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

Примечание. Мне нужно создать базу данных и коллекцию с этими именами в наборе Azure. Тогда я могу использовать либо CreateDocument, либо UpsertDocument. Я собираюсь использовать UpsertDocument.

client.UpsertDocument(collLink,{'attribute1': 4}, options=None)

Это работает! Как вы видите в документации: https://docs.microsoft.com/en-us/python/api/pydocumentdb/pydocumentdb.document_client.documentclient?view=azure-python#upsertdocument

Однако я не знаю, как вставить несколько строк одновременно. Эти доказательства не работают:

1)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

Объект 'list' не имеет атрибута 'get'

2)

client.UpsertDocument(collLink,[{'attribute1': 4},{'attribute1': 2}], options=None)

Объект 'list' не имеет атрибута 'get'

3)

df = spark.read.csv('/FileStore/tables/points.csv', sep=';', header=True)
client.UpsertDocument(collLink, df, options=None)

Объект 'list' не имеет атрибута 'get'

Эти доказательства не работают, потому что мне нужен dict в качестве второго аргумента UpsertDocument ().

Есть какая-нибудь функция pydocumentdb или другой библиотеки python для этого?

Каков наилучший метод производительности для вставки данных из блока данных в CosmosDB с помощью pyspark?

Ответы [ 2 ]

0 голосов
/ 05 сентября 2018

Благодаря Сивапрасанне Сетураман я начал расследование. Не нужно использовать MongoDB. Наконец я нашел: https://github.com/Azure/azure-cosmosdb-spark

Будьте внимательны, чтобы использовать режим append, если вам нужно вставить поверх непустого кадра данных:

writeConfig = {
 "Endpoint" : "yourhostcosmosdb",
 "Masterkey" : "yourmasterkey",
 "Database" : "pruebadb",
 "Collection" : "collection1",
}
df.write.format("com.microsoft.azure.cosmosdb.spark").mode('append').options(**writeConfig).save()
0 голосов
/ 05 сентября 2018

Для этого вы можете использовать DataFrameWriter API, предлагаемый соединителем Spark MongoDB, вместо того, чтобы полагаться на API CosmosDB.

Следующий код должен работать:

df.write.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "<CosmosDB URI>")\
        .option("database","CosmosDB Database Name")\
        .option("collection","CosmosDB Collection Name")\
        .mode("append").save()

Вам потребуется добавить соединитель Spark-MongoDB в ваш путь к классу, используя аргумент --jars или --packages в вашей команде spark-submit.

Пример: spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 <YOUR_SRC_FILE>.py

Подробнее об API DataFrameWriter можно узнать по адресу: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

...