Как читать Azure CosmosDb Collection в Databricks и записывать в Spark DataFrame - PullRequest
0 голосов
/ 01 мая 2019

Я запрашиваю коллекцию CosmosDb и смог распечатать результаты.Когда я пытаюсь сохранить результаты в Spark DataFrame, происходит сбой.

Ссылка на этот сайт в качестве примера:

Как читать данные из CosmosDB Azure в python

Выполнены точные шаги из приведенной выше ссылки.Кроме того, пробуя приведенное ниже

 df = spark.createDataFrame(dataset)

Выдает эту ошибку:

ValueError: Некоторые типы не могут быть определены после вывода

ValueError Traceback (самый последний вызовпоследний)
in ()
25 печать (набор данных)
26
---> 27 df = spark.createDataFrame (набор данных)
28 df.show ()
29

/ databricks / spark / python / pyspark / sql / session.py в createDataFrame (self, data, schema, samplingRatio, verifySchema)
808 rdd, schema = self._createFromRDD (data.map (prepare), схема, коэффициент выборки)
809 остальное:
-> 810 rdd, схема = self._createFromLocal (map (подготовить, данные), схема)
811 jrdd = self._jvm.SerDeUtil.toJavaArray (rdd._to_java_object_rdd ())
812 jdf = self._jsparkSession.applySchemaToPythonRDD (jrdd.rdd (), schema.json ())

/ databricks / spark / python / pyspark / sql / session.py в_createFromLocal (self, data, schema)
440 запись временных файлов.
441 "" "
-> 442 данных, схема = self._wrap_data_schema (данные, схема)
443 возвращают self._sc.parallelize (data), схема

Но, желая сохранить это как Spark DataFrame

, любая помощь будеточень признателен.спасибо !!!>

Ответы [ 2 ]

0 голосов
/ 09 мая 2019

Я вижу, что вы следовали моему предыдущему ответу, используя старый Python SDK для DocumentDB для запроса документов CosmosDB для создания объекта PySpark DataFrame.Но вы не можете напрямую передать результат docs из метода client.ReadDocuments в качестве параметра data в функцию SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True), поскольку типы данных отличаются, как показано ниже.

Для функции createDataFrame требуется параметр data, который должен быть RDD или list или pandas.DataFrame

enter image description here

Однако,Я скачал исходные коды pydocumentdb-2.3.3.tar.gz из https://pypi.org/project/pydocumentdb/#files и просмотрел файлы кодов document_client.py & query_iterable.py.

# from document_client.py
def ReadDocuments(self, collection_link, feed_options=None):
    """Reads all documents in a collection.

    :param str collection_link:
        The link to the document collection.
    :param dict feed_options:

    :return:
        Query Iterable of Documents.
    :rtype:
        query_iterable.QueryIterable

    """
    if feed_options is None:
        feed_options = {}

    return self.QueryDocuments(collection_link, None, feed_options)

# query_iterable.py
class QueryIterable(object):
    """Represents an iterable object of the query results.
    QueryIterable is a wrapper for query execution context.
    """

Итак, чтобы исправить вашу проблему,сначала нужно создать объект pandas.DataFrame, повторив результат Query Iterable of Documents из метода ReadDocuments, а затем создать объект PySpark DataFrame с помощью spark.createDataFrame(pandas_df).

0 голосов
/ 03 мая 2019

Чтобы вывести тип поля, PySpark просматривает записи non-none в каждом поле.Если в поле есть только записи None, PySpark не может определить тип и выдаст эту ошибку.

Ручное определение схемы решит проблему

>>> from pyspark.sql.types import StructType, StructField, StringType
>>> schema = StructType([StructField("foo", StringType(), True)])
>>> df = spark.createDataFrame([[None]], schema=schema)
>>> df.show()
+----+
|foo |
+----+
|null|
+----+

Надеюсь, это поможет.

...