Странное поведение при использовании функции toDF () для преобразования RDD в Dataframe в PySpark - PullRequest
0 голосов
/ 31 октября 2018

Я новичок в Spark. И когда я использую функцию toDF () для преобразования RDD в dataframe, кажется, что она вычисляет все функции преобразования, такие как map (), которые я написал ранее. Интересно, является ли toDF () в PySpark преобразованием или действием.

Я создаю простой RDD и использую простую функцию для вывода его значения, только для проверки, и использую toDF () после map (). В результате кажется, что функция запускается в карте частично. И когда я показываю результат dataframe, toDF () действует как преобразование и выдает результат снова.

>>> a = sc.parallelize([(1,),(2,),(3,)])
>>> def f(x):
...     print(x[0])
...     return (x[0] + 1, )
...
>>> b = a.map(f).toDF(["id"])
2
1
>>> b = a.map(f).toDF(["id"]).show()
2
1
1
2
3
+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+

Может кто-нибудь сказать мне, почему функция toDF () в PySpark действует как действие и преобразование? Большое спасибо.

PS: В Scala toDF действуют как трансформация в моем случае.

1 Ответ

0 голосов
/ 31 октября 2018

Это не странно. Поскольку вы не предоставили схему, Spark должен определить ее на основе данных. Если RDD является входом, он будет вызывать SparkSession._createFromRDD, а затем SparkSession._inferSchema, который, если отсутствует samplingRatio, оценивает до 100 строка : * +1011 *

first = rdd.first()
if not first:
    raise ValueError("The first row in RDD is empty, "
                     "can not infer schema")
if type(first) is dict:
    warnings.warn("Using RDD of dict to inferSchema is deprecated. "
                  "Use pyspark.sql.Row instead")


if samplingRatio is None:
    schema = _infer_schema(first, names=names)
    if _has_nulltype(schema):
        for row in rdd.take(100)[1:]:
            schema = _merge_type(schema, _infer_schema(row, names=names))
            if not _has_nulltype(schema):
                break
        else:
            raise ValueError("Some of types cannot be determined by the "
                             "first 100 rows, please try again with sampling")

Теперь остается только загадка, если она не оценивает ровно одну запись. Ведь в вашем случае first не является пустым и не содержит None.

Это потому, что first реализован через take и не гарантирует, что будет оценено точное количество предметов. Если первый раздел не даст нужного количества элементов, он будет многократно увеличивать количество сканируемых разделов. Пожалуйста, проверьте реализацию для деталей.

Если вы хотите избежать этого, вы должны использовать createDataFrame и предоставить схему в виде строки DDL:

spark.createDataFrame(a.map(f), "val: integer")

или эквивалент StructType.

Подобного поведения вы не найдете в аналоге Scala, поскольку он не использует логический вывод схемы в toDF. Он либо извлекает соответствующую схему из Encoder (которая выбирается с использованием отражения Scala), либо вообще не разрешает преобразование. Наиболее близким подобным поведением является вывод на входном источнике, таком как CSV или JSON :

spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))
...