Вот решение
import pyspark.sql.functions as F
df = spark.createDataFrame(
[(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
(23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
(34, 'BRAD', 32, 'XXXX', None, None),
(11, 'MATT', 23, 'ASDF', 'QWER', None)],
['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)
# Create an array of the columns you want
cols = F.array(
*[F.col(c).alias(c) for c in ['data_rel_1', 'data_rel_2', 'data_rel_3']]
)
df.withColumn(
"data_rel", cols
).select(
'data_id',F.explode('data_rel').alias('data_rel')
).filter(
F.col('data_rel').isNotNull()
).show()
, что приводит к:
+-------+--------+
|data_id|data_rel|
+-------+--------+
| 12| ASDF|
| 12| QWER|
| 12| ZXCV|
| 23| AAAA|
| 23| SSSS|
| 23| DDDD|
| 34| XXXX|
| 11| ASDF|
| 11| QWER|
+-------+--------+
РЕДАКТИРОВАТЬ Другое решение, использующее rdd и также взорвать, может принять шаблон в качестве параметра (Это может не привести к исключениям с большим количеством столбцов)
import pyspark.sql.functions as F
#takes pattern, and explodes those cols which match pattern
def explode(row,pattern):
import re
for c in row.asDict():
if re.search(pattern, c):
yield (row['data_id'],row[c])
df = spark.createDataFrame(
[(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
(23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
(34, 'BRAD', 32, 'XXXX', None, None),
(11, 'MATT', 23, 'ASDF', 'QWER', None)],['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)
pattern = '/*rel/*'
df.rdd.flatMap(
lambda r:explode(r,pattern)
).toDF(
['data_id','data_rel']
).filter(
F.col('data_rel').isNotNull()
).show()