PySpark пытается применить схему предыдущего поля к следующему полю - PullRequest
0 голосов
/ 01 февраля 2019

Наличие этой странной проблемы с PySpark.Кажется, что он пытается применить схему для предыдущего поля, к следующему полю во время его обработки.

Простейший тестовый пример, который я мог придумать:

%pyspark
from pyspark.sql.types import (
    DateType,
    StructType,
    StructField,
    StringType,
)

from datetime import date
from pyspark.sql import Row


schema = StructType(
    [
        StructField("date", DateType(), True),
        StructField("country", StringType(), True),
    ]
)

test = spark.createDataFrame(
    [
        Row(
            date=date(2019, 1, 1),
            country="RU",
        ),
    ],
    schema
)

Stacktrace:

Fail to execute line 26:     schema
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-8579306903394369208.py", line 380, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 26, in <module>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 691, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 423, in _createFromLocal
    data = [schema.toInternal(row) for row in data]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in toInternal
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 601, in <genexpr>
    for f, v, c in zip(self.fields, obj, self._needConversion))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 439, in toInternal
    return self.dataType.toInternal(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 175, in toInternal
    return d.toordinal() - self.EPOCH_ORDINAL
AttributeError: 'str' object has no attribute 'toordinal'

Дополнительная информация от запуска его локально, а не в Zepplin:

self = DateType, d = 'RU'

    def toInternal(self, d):
        if d is not None:
>           return d.toordinal() - self.EPOCH_ORDINAL
E           AttributeError: 'str' object has no attribute 'toordinal'

Например, он пытается применить DateType к country.Если я избавлюсь от date, это нормально.Если я избавлюсь от country, это нормально.И то, и другое вместе, не стоит.

Есть идеи?Я что-то упускаю из виду?

1 Ответ

0 голосов
/ 01 февраля 2019

Если вы собираетесь использовать список Row s, вам также не нужно указывать схему.Это связано с тем, что Row уже знает схему.

Проблема возникает из-за того, что объект pyspark.sql.Row не поддерживает порядок, указанный для полей.

print(Row(date=date(2019, 1, 1), country="RU"))
#Row(country='RU', date=datetime.date(2019, 1, 1))

Из документов :

Строка может использоваться для создания объекта строки с использованием именованных аргументов, поля будут отсортированы по именам.

Как видите, поле country ставится первым.Когда spark пытается создать DataFrame с указанным schema, он ожидает, что первый элемент будет DateType.

. Один из способов исправить это - поместить поля в schema в алфавитном порядке.:

schema = StructType(
    [
        StructField("country", StringType(), True),
        StructField("date", DateType(), True)
    ]
)

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ],
    schema
)
test.show()
#+-------+----------+
#|country|      date|
#+-------+----------+
#|     RU|2019-01-01|
#+-------+----------+

Или в этом случае нет необходимости даже передавать schema в createDataFrame.Это будет выведено из Row s:

test = spark.createDataFrame(
    [
        Row(date=date(2019, 1, 1), country="RU")
    ]
)

И если вы хотите изменить порядок столбцов, используйте select:

test = test.select("date", "country")
test.show()
#+----------+-------+
#|      date|country|
#+----------+-------+
#|2019-01-01|     RU|
#+----------+-------+
...