Добавление элемента List в виде столбца в существующий фрейм данных pyspark - PullRequest
1 голос
/ 01 октября 2019

У меня есть список lists=[0,1,2,3,5,6,7]. Заказ не последовательный. У меня есть фрейм данных pyspark с 9 столбцами.

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|    NaN|           NaN|    1| NaN |

Мне нужно добавить мои списки в виде столбца к существующему фрейму данных. Мои списки не в порядке, поэтому я не могу использовать udf. Есть ли способ сделать это? Пожалуйста, помогите мне, я хочу, чтобы это было так

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+------+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|lists |
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+-------+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|0     |
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|           NaN|    1| NaN |1     |

Ответы [ 2 ]

1 голос
/ 03 октября 2019

Не слишком уверен, должно ли это быть что-то подобное или вы ожидали чего-то другого. Если ваше количество элементов списка и строк данных должно быть одинаковым, то вот простой подход.

Для данного образца данных с тремя столбцами:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]
 df=spark.createDataFrame(l, ['id', 'value','age'])

Допустим, вот список:

lists=[5,6,7,8]

Можно создать rdd из этого списка и использовать zipработать с данным фреймом и использовать для него функцию map.

listrdd = sc.parallelize(lists)

newdf=df.rdd.zip(listrdd).map(lambda (x,y ) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])

>>> ziprdd=df.rdd.zip(listrdd)
>>> ziprdd.take(50)
[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

Поскольку zip-функция возвращает пары значений ключей, первый элемент которых содержит данные из первого rdd, а второй элемент содержит данные из второго rdd. Я использую списочный анализ для первого элемента и объединяю его со вторым элементом.

Он динамический и может работать для n столбцов, но элементы списка и строки данных должны быть одинаковыми.

>>> newdf.show()
]+---+-----+----+------------+
| id|Value|,age|List_element|
+---+-----+----+------------+
|  1|  DEF|  33|           5|
|  2|  KLM|  22|           6|
|  3|  ABC|  32|           7|
|  4|  XYZ|  77|           8|
+---+-----+----+------------+

Примечание: При использовании метода zip оба счетчика разделов rdd должны быть одинаковыми, иначе вы получите ошибку

ValueError: Can only zip with RDD which has the same number of partitions
0 голосов
/ 02 октября 2019

вы можете join два dfs, например:

df2 = spark.createDataFrame()
df= df.join(df2, on=['index']).drop('index')

df2 будет содержать столбцы, которые вы хотите добавить к основному df.

...