Как добавить больше строк в pyspark df по значению столбца - PullRequest
1 голос
/ 16 апреля 2020

Я застрял с этой проблемой довольно долго и, вероятно, делаю ее больше, чем есть на самом деле. Я постараюсь упростить это.

Я использую функции pyspark и фрейма данных вместе с моим кодом.

У меня уже есть df как:

+--+-----+---------+
|id|col1 |col2     |
+--+-----+---------+
|1 |Hello|Repeat   |
|2 |Word |Repeat   |
|3 |Aux  |No repeat|
|4 |Test |Repeat   |
+--+-----+---------+

Что я хочу достичь - повторять строки df, когда col2 - «Повторить», увеличивая значения col1 в значении + 1.

+--+-----+---------+------+
|id|col1 |col2     |col3  |
+--+-----+---------+------+
|1 |Hello|Repeat   |Hello1|
|1 |Hello|Repeat   |Hello2|
|1 |Hello|Repeat   |Hello3|
|2 |Word |Repeat   |Word1 |
|2 |Word |Repeat   |Word2 |
|2 |Word |Repeat   |Word3 |
|3 |Aux  |No repeat|Aux   |
|4 |Test |Repeat   |Test1 |
|4 |Test |Repeat   |Test2 |
|4 |Test |Repeat   |Test3 |
+--+-----+---------+------+

Мой первый подход заключался в использовании оператора withColumn для создания нового колонка с помощью udf:

my_func = udf(lambda words: (words + str(i + 1 for i in range(3))), StringType())
df = df\
    .withColumn('col3', when(col('col2') == 'No Repeat', col('col1'))
                            .otherwise(my_func(col('col1'))))

Но когда я оцениваю это в df.show (10, False) , это выдает мне ошибку. Я предполагаю, потому что я просто не могу создать больше строк с помощью функции withColumn таким образом.

Поэтому я решаю go для другого подхода, но безуспешно. Использование rdd.flatMap :

test = df.rdd.flatMap(lambda row: (row if (row.col2== 'No Repeat') else (row.col1 + str(i+1) for i in range(3))))
print(test.collect())

Но здесь я теряю схему df и не могу выбросить полную строку при условии else, это только бросить мне слова col1 плюс итератор .

Знаете ли вы какой-нибудь правильный способ решить эту проблему?

В конце моя проблема в том, что я не получаю правильный способ создать больше строк на основе значений столбцов, потому что я совсем новичок в этом мире. Также ответы, которые я нашел, кажется, не подходят для этой проблемы.

Вся помощь будет признательна.

1 Ответ

2 голосов
/ 16 апреля 2020

Один из способов - использовать условие и назначить массив, затем взорвать,

import pyspark.sql.functions as F

(df.withColumn("test",F.when(df['col2']=='Repeat',
       F.array([F.lit(str(i)) for i in range(1,4)])).otherwise(F.array(F.lit(''))))
  .withColumn("col3",F.explode(F.col("test"))).drop("test")
  .withColumn("col3",F.concat(F.col("col1"),F.col("col3")))).show()

Более точная версия, предложенная @MohammadMurtazaHashmi, будет выглядеть так:

(df.withColumn("test",F.when(df['col2']=='Repeat',
     F.array([F.concat(F.col("col1"),F.lit(str(i))) for i in range(1,4)]))
    .otherwise(F.array(F.col("col1"))))
    .select("id","col1","col2", F.explode("test"))).show()

+---+-----+---------+------+
| id| col1|     col2|  col3|
+---+-----+---------+------+
|  1|Hello|   Repeat|Hello1|
|  1|Hello|   Repeat|Hello2|
|  1|Hello|   Repeat|Hello3|
|  2| Word|   Repeat| Word1|
|  2| Word|   Repeat| Word2|
|  2| Word|   Repeat| Word3|
|  3|  Aux|No repeat|   Aux|
|  4| Test|   Repeat| Test1|
|  4| Test|   Repeat| Test2|
|  4| Test|   Repeat| Test3|
+---+-----+---------+------+
...