Сводите массив структур в столбцы с помощью pyspark - не взрывайте массив - PullRequest
2 голосов
/ 29 мая 2020

В настоящее время у меня есть фрейм данных с идентификатором и столбцом, который массив структур :

 root
 |-- id: string (nullable = true)
 |-- lists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

Вот пример таблицы с данными:

 id | list1             | list2
 ------------------------------------------
 1  | [[a, av], [b, bv]]| [[e, ev], [f,fv]]
 2  | [[c, cv]]         | [[g,gv]]

Как мне преобразовать вышеуказанный фрейм данных в тот, что ниже? Мне нужно «взорвать» массив и добавить столбцы на основе первого значения в структуре.

 id | a   | b   | c   | d   | e  | f  | g  
 ----------------------------------------
 1  | av  | bv  | null| null| ev | fv | null
 2  | null| null| cv  | null|null|null|gv

Код pyspark для создания фрейма данных выглядит следующим образом:

d1 = spark.createDataFrame([("1", [("a","av"),("b","bv")], [("e", "ev"), ("f", "fv")]), \
                                    ("2", [("c", "cv")],  [("g", "gv")])], ["id","list1","list2"])

Примечание: У меня есть искровая версия 2.2.0, поэтому некоторые sql функции не работают, например concat_map, et c.

Ответы [ 2 ]

3 голосов
/ 29 мая 2020

Вы можете сделать это, используя функции порядка hogher без разделения массивов, например:

d1.select('id',
          f.when(f.size(f.expr('''filter(list1,x->x._1='a')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='a'),value->value._2)'''))).alias('a'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='b')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='b'),value->value._2)'''))).alias('b'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='c')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='c'),value->value._2)'''))).alias('c'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='d')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='d'),value->value._2)'''))).alias('d'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='e')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='e'),value->value._2)'''))).alias('e'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='f')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='f'),value->value._2)'''))).alias('f'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='g')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='g'),value->value._2)'''))).alias('g'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='h')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='h'),value->value._2)'''))).alias('h')\
          ).show()


+---+----+----+----+----+----+----+----+----+
| id|   a|   b|   c|   d|   e|   f|   g|   h|
+---+----+----+----+----+----+----+----+----+
|  1|  av|  bv|null|null|  ev|  fv|null|null|
|  2|null|null|  cv|null|null|null|  gv|null|
+---+----+----+----+----+----+----+----+----+

Надеюсь, это поможет

2 голосов
/ 29 мая 2020

UPD - для Spark 2.2.0

Вы можете определить аналогичные функции в 2.2.0 с помощью udfs. Они будут намного менее эффективны с точки зрения производительности, и вам понадобится специальная функция для каждого типа выходного значения (т.е. у вас не будет одной функции element_at, которая могла бы выводить значение любого типа из любого типа карты) , но они будут работать. Приведенный ниже код работает для Spark 2.2.0:

from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, ArrayType, StringType

@udf(MapType(StringType(), StringType()))
def map_from_entries(l):
    return {x:y for x,y in l}

@udf(MapType(StringType(), StringType()))
def map_concat(m1, m2):
    m1.update(m2)
    return m1

@udf(ArrayType(StringType()))
def map_keys(m):
    return list(m.keys())

def element_getter(k):
    @udf(StringType())
    def element_at(m):
        return m.get(k)
    return element_at

d2 = d1.select('id',
               map_concat(map_from_entries('list1'),
                          map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(map_keys('merged_map')).alias('mk')) \
             .agg(f.collect_set('mk').alias('keys')) \
             .collect()[0].keys
map_keys = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
selects = [element_getter(k)('merged_map').alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects) 

ОРИГИНАЛЬНЫЙ ОТВЕТ (работает для Spark 2.4.0 +)

Не ясно, откуда появился столбец d from в вашем примере (d никогда не появлялся в исходном фрейме данных). Если столбцы должны быть созданы на основе первых элементов в массиве, тогда это должно работать (при условии, что общее количество уникальных первых значений в списках достаточно мало):

import pyspark.sql.functions as f
d2 = d1.select('id',
               f.map_concat(f.map_from_entries('list1'),
                            f.map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(f.map_keys('merged_map')).alias('mk')) \
             .agg(f.collect_set('mk').alias('keys')) \
             .collect()[0].keys
selects = [f.element_at('merged_map', k).alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects)

Вывод (нет столбца для d потому что он никогда не упоминался в начальном DataFrame):

+---+----+----+----+----+----+----+
| id|   a|   b|   c|   e|   f|   g|
+---+----+----+----+----+----+----+
|  1|  av|  bv|null|  ev|  fv|null|
|  2|null|null|  cv|null|null|  gv|
+---+----+----+----+----+----+----+

Если вы действительно имели в виду, что список столбцов фиксирован с самого начала (и они не берутся из массива), тогда вы можете просто замените определение varaible map_keys фиксированным списком столбцов, например map_keys=['a', 'b', 'c', 'd', 'e', 'f', 'g']. В этом случае вы получите результат, упомянутый в ответе:

+---+----+----+----+----+----+----+----+
| id|   a|   b|   c|   d|   e|   f|   g|
+---+----+----+----+----+----+----+----+
|  1|  av|  bv|null|null|  ev|  fv|null|
|  2|null|null|  cv|null|null|null|  gv|
+---+----+----+----+----+----+----+----+

Кстати, вы хотите сделать не то, что называется explode в Spark. explode в Spark предназначен для ситуации, когда вы создаете несколько строк из одной. Например, если вы хотите получить из фрейма данных вот так:

+---+---------+
| id|      arr|
+---+---------+
|  1|   [a, b]|
|  2|[c, d, e]|
+---+---------+

до этого:

+---+-------+
| id|element|
+---+-------+
|  1|      a|
|  1|      b|
|  2|      c|
|  2|      d|
|  2|      e|
+---+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...