Вы можете использовать функцию udf
для разделения и слияния разделенных строк и, наконец, использовать функции explode
и select
, чтобы получить конечный кадр данных как
from pyspark.sql import functions as f
from pyspark.sql import types as t
@f.udf(t.ArrayType(t.ArrayType(t.StringType())))
def splitUdf(base, target):
return [s.split("__") + [str(index+1), 'base'] for index, s in enumerate(base.split("~#~"))] + [s.split("__") + [str(index+1), 'target'] for index, s in enumerate(target.split("~#~"))]
df.withColumn('exploded', f.explode(splitUdf(f.col('items_base'), f.col('item_target'))))\
.select(f.col('id'), f.col('exploded')[0].alias('dept0'), f.col('exploded')[1].alias('att0'), f.col('exploded')[2].alias('position'), f.col('exploded')[3].alias('flag'))\
.show(truncate=False)
, который должен дать вам
+---+-----------------+-----------------+--------+------+
|id |dept0 |att0 |position|flag |
+---+-----------------+-----------------+--------+------+
|0 |departmentcode |50 |1 |base |
|0 |p99189h8pk0 |10483 |2 |base |
|0 |prod_productcolor|Dustysalmon Pink |3 |base |
|0 |departmentcode |50 |1 |target|
|0 |p99189h8pk0 |10483 |2 |target|
|0 |prod_productcolor|Dustysalmon Blue |3 |target|
|1 |departmentcode |10 |1 |base |
|1 |p99189h8pk0 |10484 |2 |base |
|1 |prod_productcolor|Dustysalmon Black|3 |base |
|1 |departmentcode |50 |1 |target|
|1 |p99189h8pk0 |10483 |2 |target|
|1 |prod_productcolor|Dustysalmon Blue |3 |target|
|2 |departmentcode |60 |1 |base |
|2 |p99189h8pk0 |10485 |2 |base |
|2 |prod_productcolor|Dustysalmon White|3 |base |
|2 |departmentcode |50 |1 |target|
|2 |p99189h8pk0 |10483 |2 |target|
|2 |prod_productcolor|Dustysalmon Blue |3 |target|
+---+-----------------+-----------------+--------+------+
Надеюсь, ответ будет полезным
Обновлено
Или даже лучше, если вы возвращаете struct type из функции udf как
@f.udf(t.ArrayType(t.StructType([t.StructField('dept0', t.StringType(), True), t.StructField('att0', t.StringType(), True), t.StructField('position', t.IntegerType(), True), t.StructField('flag', t.StringType(), True)])))
def splitUdf(base, target):
return [(s.split("__")[0], s.split("__")[1], index+1, 'base') for index, s in enumerate(base.split("~#~"))] + [(s.split("__")[0], s.split("__")[1], index+1, 'target') for index, s in enumerate(target.split("~#~"))]
df.withColumn('exploded', f.explode(splitUdf(f.col('items_base'), f.col('item_target'))))\
.select(f.col('id'), f.col('exploded.*'))\
.show(truncate=False)
который должен дать вам тот же результат