Этот код достаточно надежен, чтобы принимать любое количество элементов в массивах. Хотя OP
имеет 3 элемента в каждом массиве. Мы начинаем с создания указанного DataFrame
.
# Loading requisite packages.
from pyspark.sql.functions import col, explode, first, udf
df = sqlContext.createDataFrame([(['val1', 'val2', 'val3'],),
(['val4', 'val5', 'val6'],),
(['val7', 'val8', 'val9'],)],['ColA',])
df.show()
+------------------+
| ColA|
+------------------+
|[val1, val2, val3]|
|[val4, val5, val6]|
|[val7, val8, val9]|
+------------------+
Поскольку мы хотим, чтобы каждый элемент отдельного массива был помечен как соответствующий столбец, поэтому в качестве первого шага мы попытаемся сопоставить имя столбца. и значение. Для этого мы создаем user defined function
- (UDF
) .
def func(c):
return [['Col'+str(i+1),c[i]] for i in range(len(c))]
func_udf = udf(func,ArrayType(StructType([
StructField('a', StringType()),
StructField('b', StringType())
])))
df = df.withColumn('ColA_new',func_udf(col('ColA')))
df.show(truncate=False)
+------------------+---------------------------------------+
|ColA |ColA_new |
+------------------+---------------------------------------+
|[val1, val2, val3]|[[Col1,val1], [Col2,val2], [Col3,val3]]|
|[val4, val5, val6]|[[Col1,val4], [Col2,val5], [Col3,val6]]|
|[val7, val8, val9]|[[Col1,val7], [Col2,val8], [Col3,val9]]|
+------------------+---------------------------------------+
Как только это будет сделано, мы explode
DataFrame.
# Step 1: Explode the DataFrame
df=df.withColumn('vals', explode('ColA_new')).drop('ColA_new')
df.show()
+------------------+-----------+
| ColA| vals|
+------------------+-----------+
|[val1, val2, val3]|[Col1,val1]|
|[val1, val2, val3]|[Col2,val2]|
|[val1, val2, val3]|[Col3,val3]|
|[val4, val5, val6]|[Col1,val4]|
|[val4, val5, val6]|[Col2,val5]|
|[val4, val5, val6]|[Col3,val6]|
|[val7, val8, val9]|[Col1,val7]|
|[val7, val8, val9]|[Col2,val8]|
|[val7, val8, val9]|[Col3,val9]|
+------------------+-----------+
После взрыва мы извлекаем первый и второй элемент, которые были названы a
и b
соответственно в UDF
.
df=df.withColumn('column_name', col('vals').getItem('a'))
df=df.withColumn('value', col('vals').getItem('b')).drop('vals')
df.show()
+------------------+-----------+-----+
| ColA|column_name|value|
+------------------+-----------+-----+
|[val1, val2, val3]| Col1| val1|
|[val1, val2, val3]| Col2| val2|
|[val1, val2, val3]| Col3| val3|
|[val4, val5, val6]| Col1| val4|
|[val4, val5, val6]| Col2| val5|
|[val4, val5, val6]| Col3| val6|
|[val7, val8, val9]| Col1| val7|
|[val7, val8, val9]| Col2| val8|
|[val7, val8, val9]| Col3| val9|
+------------------+-----------+-----+
В качестве последнего шагаМы pivot
возвращаем DataFrame, чтобы получить окончательный DataFrame. Так как при повороте мы делаем aggregation
, поэтому мы агрегируем на основе first()
, который принимает первый элемент группы.
# Step 2: Pivot it back.
df = df.groupby('ColA').pivot('column_name').agg(first('value')).drop('ColA')
df.show()
+----+----+----+
|Col1|Col2|Col3|
+----+----+----+
|val1|val2|val3|
|val4|val5|val6|
|val7|val8|val9|
+----+----+----+