UPD - для Spark 2.2.0
Вы можете определить аналогичные функции в 2.2.0 с помощью udfs. Они будут намного менее эффективны с точки зрения производительности, и вам понадобится специальная функция для каждого типа выходного значения (т.е. у вас не будет одной функции element_at
, которая могла бы выводить значение любого типа из любого типа карты) , но они будут работать. Приведенный ниже код работает для Spark 2.2.0:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, ArrayType, StringType
@udf(MapType(StringType(), StringType()))
def map_from_entries(l):
return {x:y for x,y in l}
@udf(MapType(StringType(), StringType()))
def map_concat(m1, m2):
m1.update(m2)
return m1
@udf(ArrayType(StringType()))
def map_keys(m):
return list(m.keys())
def element_getter(k):
@udf(StringType())
def element_at(m):
return m.get(k)
return element_at
d2 = d1.select('id',
map_concat(map_from_entries('list1'),
map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(map_keys('merged_map')).alias('mk')) \
.agg(f.collect_set('mk').alias('keys')) \
.collect()[0].keys
map_keys = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
selects = [element_getter(k)('merged_map').alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects)
ОРИГИНАЛЬНЫЙ ОТВЕТ (работает для Spark 2.4.0 +)
Не ясно, откуда появился столбец d
from в вашем примере (d
никогда не появлялся в исходном фрейме данных). Если столбцы должны быть созданы на основе первых элементов в массиве, тогда это должно работать (при условии, что общее количество уникальных первых значений в списках достаточно мало):
import pyspark.sql.functions as f
d2 = d1.select('id',
f.map_concat(f.map_from_entries('list1'),
f.map_from_entries('list2')).alias('merged_map'))
map_keys = d2.select(f.explode(f.map_keys('merged_map')).alias('mk')) \
.agg(f.collect_set('mk').alias('keys')) \
.collect()[0].keys
selects = [f.element_at('merged_map', k).alias(k) for k in sorted(map_keys)]
d = d2.select('id', *selects)
Вывод (нет столбца для d
потому что он никогда не упоминался в начальном DataFrame):
+---+----+----+----+----+----+----+
| id| a| b| c| e| f| g|
+---+----+----+----+----+----+----+
| 1| av| bv|null| ev| fv|null|
| 2|null|null| cv|null|null| gv|
+---+----+----+----+----+----+----+
Если вы действительно имели в виду, что список столбцов фиксирован с самого начала (и они не берутся из массива), тогда вы можете просто замените определение varaible map_keys
фиксированным списком столбцов, например map_keys=['a', 'b', 'c', 'd', 'e', 'f', 'g']
. В этом случае вы получите результат, упомянутый в ответе:
+---+----+----+----+----+----+----+----+
| id| a| b| c| d| e| f| g|
+---+----+----+----+----+----+----+----+
| 1| av| bv|null|null| ev| fv|null|
| 2|null|null| cv|null|null|null| gv|
+---+----+----+----+----+----+----+----+
Кстати, вы хотите сделать не то, что называется explode
в Spark. explode
в Spark предназначен для ситуации, когда вы создаете несколько строк из одной. Например, если вы хотите получить из фрейма данных вот так:
+---+---------+
| id| arr|
+---+---------+
| 1| [a, b]|
| 2|[c, d, e]|
+---+---------+
до этого:
+---+-------+
| id|element|
+---+-------+
| 1| a|
| 1| b|
| 2| c|
| 2| d|
| 2| e|
+---+-------+