разделение строки на несколько строк pyspark - PullRequest
0 голосов
/ 06 июня 2018

У меня есть фрейм данных, похожий на:

df = spark.createDataFrame([(0, "departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Pink","departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Blue"), (1, "departmentcode__10~#~p99189h8pk0__10484~#~prod_productcolor__Dustysalmon Black","departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Blue"), (2, "departmentcode__60~#~p99189h8pk0__10485~#~prod_productcolor__Dustysalmon White","departmentcode__50~#~p99189h8pk0__10483~#~prod_productcolor__Dustysalmon Blue")], ["id", "items_base", "item_target"])

Мне нужен новый фрейм данных, похожий на followng:

+---+-----------------+----------------+--------+------+
|id |dept0            |att0            |position|flag  |
+---+-----------------+----------------+--------+------+
|0  |departmentcode   |50              |1       |Base  |
|0  |p99189h8pk0      |10483           |2       |Base  |
|0  |prod_productcolor|Dustysalmon Pink|3       |Base  |
|0  |departmentcode   |50              |1       |Target|
|0  |p99189h8pk0      |10483           |2       |Target|
|0  |prod_productcolor|Dustysalmon Blue|3       |Target|
|1  |departmentcode   |10              |1       |Base  |
...
...
+---+-----------------+----------------+--------+------+

Я разделяю items_base и item_target с ~ # ~ и __ исоздание новых 6 строк.3 строки из items_base и 3 из item_target (где position - это позиция dept0 после операции разделения, флаг представляет, является ли это items_base или items_target)

Ответы [ 3 ]

0 голосов
/ 06 июня 2018

Вы можете использовать flatMap для преобразования СДР длины N в набор из N коллекций:

from pyspark.sql import Row

def etl(row) :
  list_row = []
  items_base = row.items_base.split('~#~')
  for item in items_base:
      row_items_base = Row(id = row.id, dept0 = item.split('__')[0], att0 = item.split('__')[1],  position = items_base.index(item) + 1, flag = 'Base')
      list_row.append(row_items_base)

  item_target = row.item_target.split('~#~')
  for item in item_target:
      row_items_base = Row(id = row.id, dept0 = item.split('__')[0], att0 = item.split('__')[1],  position = item_target.index(item) + 1, flag = 'Target')
      list_row.append(row_items_base)

  return list_row 


df.rdd.flatMap(etl).toDF().show()

Вывод:

enter image description here

0 голосов
/ 06 июня 2018

Вы можете использовать функцию udf для разделения и слияния разделенных строк и, наконец, использовать функции explode и select, чтобы получить конечный кадр данных как

from pyspark.sql import functions as f
from pyspark.sql import types as t
@f.udf(t.ArrayType(t.ArrayType(t.StringType())))
def splitUdf(base, target):
    return [s.split("__") + [str(index+1), 'base'] for index, s in enumerate(base.split("~#~"))] + [s.split("__") + [str(index+1), 'target'] for index, s in enumerate(target.split("~#~"))]

df.withColumn('exploded', f.explode(splitUdf(f.col('items_base'), f.col('item_target'))))\
    .select(f.col('id'), f.col('exploded')[0].alias('dept0'), f.col('exploded')[1].alias('att0'), f.col('exploded')[2].alias('position'), f.col('exploded')[3].alias('flag'))\
    .show(truncate=False)

, который должен дать вам

+---+-----------------+-----------------+--------+------+
|id |dept0            |att0             |position|flag  |
+---+-----------------+-----------------+--------+------+
|0  |departmentcode   |50               |1       |base  |
|0  |p99189h8pk0      |10483            |2       |base  |
|0  |prod_productcolor|Dustysalmon Pink |3       |base  |
|0  |departmentcode   |50               |1       |target|
|0  |p99189h8pk0      |10483            |2       |target|
|0  |prod_productcolor|Dustysalmon Blue |3       |target|
|1  |departmentcode   |10               |1       |base  |
|1  |p99189h8pk0      |10484            |2       |base  |
|1  |prod_productcolor|Dustysalmon Black|3       |base  |
|1  |departmentcode   |50               |1       |target|
|1  |p99189h8pk0      |10483            |2       |target|
|1  |prod_productcolor|Dustysalmon Blue |3       |target|
|2  |departmentcode   |60               |1       |base  |
|2  |p99189h8pk0      |10485            |2       |base  |
|2  |prod_productcolor|Dustysalmon White|3       |base  |
|2  |departmentcode   |50               |1       |target|
|2  |p99189h8pk0      |10483            |2       |target|
|2  |prod_productcolor|Dustysalmon Blue |3       |target|
+---+-----------------+-----------------+--------+------+

Надеюсь, ответ будет полезным

Обновлено

Или даже лучше, если вы возвращаете struct type из функции udf как

@f.udf(t.ArrayType(t.StructType([t.StructField('dept0', t.StringType(), True), t.StructField('att0', t.StringType(), True), t.StructField('position', t.IntegerType(), True), t.StructField('flag', t.StringType(), True)])))
def splitUdf(base, target):
    return [(s.split("__")[0], s.split("__")[1], index+1, 'base') for index, s in enumerate(base.split("~#~"))] + [(s.split("__")[0], s.split("__")[1], index+1, 'target') for index, s in enumerate(target.split("~#~"))]

df.withColumn('exploded', f.explode(splitUdf(f.col('items_base'), f.col('item_target'))))\
    .select(f.col('id'), f.col('exploded.*'))\
    .show(truncate=False)

который должен дать вам тот же результат

0 голосов
/ 06 июня 2018

Вам нужно сделать много шагов для достижения результата, но они не очень сложны.

base_df = df.select(
    'id',
    F.split('items_base', '~#~').alias('items_base')
).select(
    'id',
    F.posexplode('items_base')
).select(
    'id',
    F.split('col', '__').alias('items_base'),
    (F.col('pos')+1).alias('position'),
    F.lit('Base').alias('flag')
).select(
    'id',
    F.col('items_base').getItem(0).alias('dept0'),
    F.col('items_base').getItem(1).alias('att0'),
    'position',
    'flag',
)


target_df = df.select(
    'id',
    F.split('item_target', '~#~').alias('item_target')
).select(
    'id',
    F.posexplode('item_target')
).select(
    'id',
    F.split('col', '__').alias('item_target'),
    (F.col('pos')+1).alias('position'),
    F.lit('Target').alias('flag')
).select(
    'id',
    F.col('item_target').getItem(0).alias('dept0'),
    F.col('item_target').getItem(1).alias('att0'),
    'position',
    'flag',
)

base_df.union(target_df).show()

+---+-----------------+-----------------+--------+------+
| id|            dept0|             att0|position|  flag|
+---+-----------------+-----------------+--------+------+
|  0|   departmentcode|               50|       1|  Base|
|  0|      p99189h8pk0|            10483|       2|  Base|
|  0|prod_productcolor| Dustysalmon Pink|       3|  Base|
|  1|   departmentcode|               10|       1|  Base|
|  1|      p99189h8pk0|            10484|       2|  Base|
|  1|prod_productcolor|Dustysalmon Black|       3|  Base|
|  2|   departmentcode|               60|       1|  Base|
|  2|      p99189h8pk0|            10485|       2|  Base|
|  2|prod_productcolor|Dustysalmon White|       3|  Base|
|  0|   departmentcode|               50|       1|Target|
|  0|      p99189h8pk0|            10483|       2|Target|
|  0|prod_productcolor| Dustysalmon Blue|       3|Target|
|  1|   departmentcode|               50|       1|Target|
|  1|      p99189h8pk0|            10483|       2|Target|
|  1|prod_productcolor| Dustysalmon Blue|       3|Target|
|  2|   departmentcode|               50|       1|Target|
|  2|      p99189h8pk0|            10483|       2|Target|
|  2|prod_productcolor| Dustysalmon Blue|       3|Target|
+---+-----------------+-----------------+--------+------+
...