Pyspark - добавить недостающие значения для каждого ключа? - PullRequest
2 голосов
/ 11 марта 2020

У меня есть фрейм данных Pyspark с некоторым неуникальным ключом key и некоторыми столбцами number и value.

Для большинства keys столбец number идет от 1 до 12, но для некоторых из них в numbers есть пробелы (например, у нас есть числа [1, 2, 5, 9]). Я хотел бы добавить пропущенные строки, чтобы для каждого key у нас были все numbers в диапазоне 1-12, заполненные последним увиденным значением.

Так что для таблицы

key    number    value
a      1         6
a      2         10
a      5         20
a      9         25

Я хотел бы получить

key    number    value
a      1         6
a      2         10
a      3         10
a      4         10
a      5         20
a      6         20
a      7         20
a      8         20
a      9         25
a      10        25
a      11        25
a      12        25

Я думал о создании таблицы a и массива 1-12, взрыва массива и объединении с моей исходной таблицей, а затем отдельно заполнении value столбец с предыдущим значением, используя оконную функцию, ограниченную текущей строкой. Тем не менее, это выглядит немного не элегантно, и мне интересно, есть ли лучший способ добиться того, чего я хочу?

Ответы [ 2 ]

1 голос
/ 11 марта 2020

Я думал о создании таблицы и массива 1-12, разборе массива и соединении с моей исходной таблицей, а затем отдельно заполнил столбец значения предыдущим значением, используя оконную функцию, ограниченную текущей строкой. Тем не менее, это выглядит немного не элегантно, и мне интересно, есть ли лучший способ добиться того, чего я хочу?

Я не думаю, что предложенный вами подход неэлегатен - но вы можете добиться того же, используя range вместо explode.

Сначала создайте кадр данных со всеми числами в вашем диапазоне. Вы также можете захотеть соединить это с отдельным столбцом key из вашего DataFrame.

all_numbers = spark.range(1, 13).withColumnRenamed("id", "number")
all_numbers = all_numbers.crossJoin(df.select("key").distinct()).cache()
all_numbers.show()
#+------+---+
#|number|key|
#+------+---+
#|     1|  a|
#|     2|  a|
#|     3|  a|
#|     4|  a|
#|     5|  a|
#|     6|  a|
#|     7|  a|
#|     8|  a|
#|     9|  a|
#|    10|  a|
#|    11|  a|
#|    12|  a|
#+------+---+

Теперь вы можете внешне присоединить это к вашему исходному DataFrame и прямое заполнение, используя последнее известное правильное значение . Если количество ключей достаточно мало, вы можете транслировать

from pyspark.sql.functions import broadcast, last
from pyspark.sql import Window

df.join(broadcast(all_numbers), on=["number", "key"], how="outer")\
    .withColumn(
        "value", 
        last(
            "value", 
            ignorenulls=True
        ).over(
            Window.partitionBy("key").orderBy("number")\
                .rowsBetween(Window.unboundedPreceding, 0)
        )
    )\
    .show()
#+------+---+-----+
#|number|key|value|
#+------+---+-----+
#|     1|  a|    6|
#|     2|  a|   10|
#|     3|  a|   10|
#|     4|  a|   10|
#|     5|  a|   20|
#|     6|  a|   20|
#|     7|  a|   20|
#|     8|  a|   20|
#|     9|  a|   25|
#|    10|  a|   25|
#|    11|  a|   25|
#|    12|  a|   25|
#+------+---+-----+
1 голос
/ 11 марта 2020

Вы можете сделать это без объединения . Я провел несколько тестов на это с разными пробелами, и он всегда будет работать , пока номер 1 всегда предоставляется как вход (так как вам нужна последовательность, чтобы начать оттуда), и это всегда будет в диапазоне от до 12 . Я использовал пару окон s, чтобы получить столбец, который я мог бы использовать в sequence , затем сделал пользовательскую последовательность с использованием выражений, а затем взорвался чтобы получить желаемый результат. Если по какой-либо причине у вас будут входы, в которых нет номера 1, дайте мне знать, что я обновлю свое решение.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import when
w=Window().partitionBy("key").orderBy("number")
w2=Window().partitionBy("key").orderBy("number").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("number2", F.lag("number").over(w)).withColumn("diff", F.when((F.col("number2").isNotNull()) & ((F.col("number")-F.col("number2")) > 1), (F.col("number")-F.col("number2"))).otherwise(F.lit(0)))\
.withColumn("diff2", F.lead("diff").over(w)).withColumn("diff2", F.when(F.col("diff2").isNull(), F.lit(0)).otherwise(F.col("diff2"))).withColumn("diff2", F.when(F.col("diff2")!=0, F.col("diff2")-1).otherwise(F.col("diff2"))).withColumn("max", F.max("number").over(w2))\
.withColumn("diff2", F.when((F.col("number")==F.col("max")) & (F.col("number")<F.lit(12)), F.lit(12)-F.col("number")).otherwise(F.col("diff2")))\
.withColumn("number2", F.when(F.col("diff2")!=0,F.expr("""sequence(number,number+diff2,1)""")).otherwise(F.expr("""sequence(number,number+diff2,0)""")))\
.drop("diff","diff2","max")\
.withColumn("number2", F.explode("number2")).drop("number")\
.select("key", F.col("number2").alias("number"), "value")\
.show()


+---+------+-----+
|key|number|value|
+---+------+-----+
|  a|     1|    6|
|  a|     2|   10|
|  a|     3|   10|
|  a|     4|   10|
|  a|     5|   20|
|  a|     6|   20|
|  a|     7|   20|
|  a|     8|   20|
|  a|     9|   25|
|  a|    10|   25|
|  a|    11|   25|
|  a|    12|   25|
+---+------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...