Pyspark создает столбец массива определенной длины из существующего столбца массива - PullRequest
1 голос
/ 03 мая 2020

У меня есть фрейм данных pyspark df, такой как

+-----+----+------------+------------+-------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_Values     |S_values    | 
+-----+----+------------+------------+-------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|["ab",1]     | [1,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |["a","b","c"]| []         |
+-----+----+------------+------------+-------------+------------+

Я хотел бы завершить создание df, как показано ниже,

+-----+----+------------+------------+
| Name| Age| Attribute  |      Values|
+-----+----+------------+------------+
| Bob1| 16 |  x1        |     ab     |
| Bob1| 16 |  x2        |     1      |
| Bob1| 16 |  x1        |     1      |
| Bob1| 16 |  x3        |     2      |
| Bob2| 16 |  x1        |     a      |
| Bob2| 16 |  x2        |     b      |
| Bob2| 16 |  x3        |     c      |
+-----+----+------------+------------+

В основном я хочу объединить эти 2 столбца и разбить их на строк. С помощью функций массива pyspark мне удалось объединить массивы и взорвать их, но позже выявить разницу между профессиональными атрибутами и спортивными атрибутами, поскольку они могут иметь одинаковые имена. Мне также нужен столбец типа

+-----+----+------------+------------+------------+
| Name| Age|   Attribute|       type |Value       |
+-----+----+------------+------------+------------+
| Bob1| 16 |  x1        |     1      | ab         |
| Bob1| 16 |  x2        |     1      | 1          |
| Bob1| 16 |  x1        |     2      | 1          |
| Bob1| 16 |  x3        |     2      | 2          |
| Bob2| 16 |  x1        |     1      | a          |
| Bob2| 16 |  x2        |     1      | b          |
| Bob2| 16 |  x3        |     1      | c          |
+-----+----+------------+------------+------------+  

Поэтому я подумал создать столбцы отдельного массива изначально как

+-----+----+------------+------------+------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_type      |S_type      |
+-----+----+------------+------------+------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|   [1,1]    | [2,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |  [1,1,1]   |  []        |
+-----+----+------------+------------+------------+------------+

, чтобы я мог объединять столбцы и разбивать столбцы требуемого типа а также как показано выше df. Проблема в том, что я не могу динамически создавать столбцы P_type и S_type. Я попробовал ниже код,

new_df = df.withColumn("temp_P_type", F.lit(1))\
                .withColumn("P_type", F.array_repeat("temp_P_type",F.size("P_Attribute")))

Это выдает TypeError: Column is not iterable ошибка. это также не работает, если длина столбца уже извлечена как другой столбец. Кто-нибудь может мне помочь с этим или есть ли лучшее решение для этого? Можно ли сделать это как уровень df без перехода к функциям RDD и python (без UDF)?

PS Я использую свечу 2,4

Ответы [ 2 ]

1 голос
/ 03 мая 2020

Я бы предложил использовать функцию более высокого порядка transform, с struct и array_union, а затем explode once и просто выберите оба, используя .* expansion..

df.show()
#+----+---+------------+------------+
#|Name|Age| P_Attribute|S_Attributes|
#+----+---+------------+------------+
#|Bob1| 16|    [x1, x2]|    [x1, x3]|
#|Bob2| 16|[x1, x2, x3]|          []|
#+----+---+------------+------------+

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union(F.expr("""transform(P_Attribute,x-> struct(x as Attribute,1 as Type))"""),\
              F.expr("""transform(S_Attributes,x-> struct(x as Attribute,2 as Type))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+----+---+---------+----+
#|Name|Age|Attribute|Type|
#+----+---+---------+----+
#|Bob1| 16|       x1|   1|
#|Bob1| 16|       x2|   1|
#|Bob1| 16|       x1|   2|
#|Bob1| 16|       x3|   2|
#|Bob2| 16|       x1|   1|
#|Bob2| 16|       x2|   1|
#|Bob2| 16|       x3|   1|
#+----+---+---------+----+

UPDATE:

df.show()

#+----+---+------------+------------+---------+--------+
#|Name|Age| P_Attribute|S_Attributes| P_Values|S_values|
#+----+---+------------+------------+---------+--------+
#|Bob1| 16|    [x1, x2]|    [x1, x3]|  [ab, 1]|  [1, 2]|
#|Bob2| 16|[x1, x2, x3]|          []|[a, b, c]|      []|
#+----+---+------------+------------+---------+--------+

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union\
               (F.expr("""transform(arrays_zip(P_Attribute,P_Values),x->\
                          struct(x.P_Attribute as Attribute,1 as Type,string(x.P_Values) as Value))"""),\
                F.expr("""transform(arrays_zip(S_Attributes,S_Values),x->\
                          struct(x.S_Attributes as Attribute,2 as Type,string(x.S_Values) as Value))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+----+---+---------+----+-----+
#|Name|Age|Attribute|Type|Value|
#+----+---+---------+----+-----+
#|Bob1| 16|       x1|   1|   ab|
#|Bob1| 16|       x2|   1|    1|
#|Bob1| 16|       x1|   2|    1|
#|Bob1| 16|       x3|   2|    2|
#|Bob2| 16|       x1|   1|    a|
#|Bob2| 16|       x2|   1|    b|
#|Bob2| 16|       x3|   1|    c|
#+----+---+---------+----+-----+
1 голос
/ 03 мая 2020

Вы можете сделать что-то следующим образом. Сначала соберите P_attributes и S_attributes в один столбец Attributes, затем выполните posexplode для него, это должно дать столбец type, который ссылается на источник атрибутов (P или S) как тебе нужно было. Наконец explode столбец Attributes для выравнивания всех атрибутов.

import pyspark.sql.functions as f

df = spark.createDataFrame([
    ['Bob1', 16, ['x1', 'x2'], ['x1', 'x3']],
    ['Bob2', 16, ['x1', 'x2', 'x3'], []]],
    ['Name', 'Age', 'P_Attribute', 'S_Attributes'])

df.withColumn('Attributes', f.array('P_Attribute', 'S_Attributes'))\
  .select('Name', 'Age', f.posexplode('Attributes').alias('type', 'Attribute'))\
  .withColumn('Attribute', f.explode('Attribute'))\
  .show()

+----+---+----+---------+
|Name|Age|type|Attribute|
+----+---+----+---------+
|Bob1| 16|   0|       x1|
|Bob1| 16|   0|       x2|
|Bob1| 16|   1|       x1|
|Bob1| 16|   1|       x3|
|Bob2| 16|   0|       x1|
|Bob2| 16|   0|       x2|
|Bob2| 16|   0|       x3|
+----+---+----+---------+
...