Неоднозначное поведение при добавлении нового столбца в StructType - PullRequest
0 голосов
/ 11 сентября 2018

Я определил функцию в PySpark, которая -

def add_ids(X):
    schema_new = X.schema.add("id_col", LongType(), False)
    _X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
    cols_arranged = [_X.columns[-1]] + _X.columns[0:len(_X.columns) - 1]
    return _X.select(*cols_arranged)

В приведенной выше функции я создаю новый столбец (с именем id_col), который добавляется к фрейму данных, который в основном представляет собой просто порядковый номер каждой строки, и, наконец, перемещает id_col в крайняя левая сторона

Данные, которые я использую

>>> X.show(4)
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows

Вывод функции

>>> add_ids(X).show(4)
+------+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|id_col|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+------+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|     0|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|     1|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|     2|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|     3|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
+------+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows

Все это прекрасно работает, но проблема в том, что я запускаю следующие две команды

>>> X.show(4)
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows

>>> X.columns
['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome', 'id_col']

Если вы посмотрите на результат X.columns, вы заметите id_col в конце. Но когда я запустил строку X.show(4) ранее, она не показала id_col в виде столбца.

Теперь, когда я пытаюсь запустить add_ids(X).show(4), я получаю следующую ошибку

pyspark.sql.utils.AnalysisException: "Reference 'id_col' is ambiguous, could be: id_col, id_col.;"

Что я делаю не так?

1 Ответ

0 голосов
/ 11 сентября 2018

Ошибка здесь:

schema_new = X.schema.add("id_col", LongType(), False)

Если вы отметите источник , вы увидите, что метод add изменяет данные на месте.

Это проще увидеть на упрощенном примере:

from pyspark.sql.types import *

schema = StructType()
schema.add(StructField("foo", IntegerType()))

schema
StructType(List(StructField(foo,IntegerType,true)))

Как видите, объект schema был изменен.

Вместо использования метода add необходимо перестроить схему:

schema_new = StructType(schema.fields + [StructField("id_col", LongType(), False)])

В качестве альтернативы вы можете создать глубокую копию объекта:

import copy

old_schema = StructType()
new_schehma = copy.deepcopy(old_schema).add(StructField("foo", IntegerType()))

old_schema
StructType(List())
new_schehma
StructType(List(StructField(foo,IntegerType,true)))
...