Присоединение к данным предыдущего месяца за каждый месяц в наборе данных PySpark - PullRequest
0 голосов
/ 28 января 2020

У меня есть набор данных, который составляется ежемесячно, и каждый месяц имеет N-количество учетных записей. У некоторых месяцев будут новые учетные записи, а некоторые учетные записи исчезнут через определенный месяц (это делается случайным образом).

Мне нужно получить баланс текущего месяца в аккаунте и вычесть его из баланса предыдущего месяца (если этот счет существовал в предыдущем месяце), в противном случае он будет равен балансу текущего месяца.

Мне предлагали присоединяться каждый месяц. т. е. присоединяйтесь к месяцу от 1 до месяца 2, от месяца 2 до месяца 3 и т. д. c Но я не совсем уверен, как это будет go ...

Вот пример набора данных:

|date      |account   |balance   |
----------------------------------
|01.01.2019|1         |40        |
|01.01.2019|2         |33        |
|01.01.2019|3         |31        |
|01.02.2019|1         |32        |
|01.02.2019|2         |56        |
|01.02.2019|4         |89        |
|01.03.2019|2         |12        |
|01.03.2019|4         |35        |
|01.03.2019|5         |76        |
|01.03.2019|6         |47        |
----------------------------------

Идентификатор аккаунта уникален для каждого ушедшего, текущего и нового прихода Счет.

Сначала я использовал f.lag, но теперь, когда есть аккаунты, которые исчезают и приходят новые, количество аккаунтов в месяц не является постоянным, поэтому я не могу отставать. Как я уже сказал, мне предложили использовать join. То есть присоединиться к январю в февраль, февраль к марту и т. Д. c.

Но я не совсем уверен, как это будет go. У кого-нибудь есть идеи?

PS Я создал эту таблицу с примером оставшейся учетной записи, новой учетной записи и учетной записи, удаленной из более поздних месяцев.

Конец цель:

|date      |account   |balance   | balance_diff_with_previous_month  |
--------------------------------------------------------------------|
|01.01.2019|1         |40        |na                                |
|01.01.2019|2         |33        |na                                |
|01.01.2019|3         |31        |na                                |
|01.02.2019|1         |32        |-8                                |
|01.02.2019|2         |56        |23                                |
|01.02.2019|4         |89        |89                                |
|01.03.2019|2         |12        |-44                               |
|01.03.2019|4         |35        |-54                               |
|01.03.2019|5         |76        |76                                |
|01.03.2019|6         |47        |47                                |
--------------------------------------------------------------------|

Как я уже сказал, f.lag не может использоваться, потому что число учетных записей в месяц не является постоянным, и я не контролирую количество учетных записей, поэтому не могу f.lag постоянное количество строк.

Кто-нибудь имеет какие-либо идеи о том, как объединить в учетной записи и / или дате (текущий месяц) с датой-1 (предыдущий месяц)?

Спасибо за чтение и помощь: )

Ответы [ 3 ]

2 голосов
/ 28 января 2020

альтернативное решение с использованием объединений ....

df = spark.createDataFrame([
            ("01.01.2019", 1, 40),("01.01.2019", 2, 33),("01.01.2019", 3, 31),
            ("01.02.2019", 1, 32), ("01.02.2019", 2, 56),("01.02.2019", 4, 89),
            ("01.03.2019", 2, 12),("01.03.2019", 4, 35),("01.03.2019", 5, 76),("01.03.2019", 6, 47)],
            ["date","account","balance"])

df.alias("current").join(
    df.alias("previous"),
    [F.to_date(F.col("previous.date"), "dd.MM.yyyy") == F.to_date(F.add_months(F.to_date(F.col("current.date"), "dd.MM.yyyy"),-1),"dd.MM.yyyy"), F.col("previous.account") == F.col("current.account")],
    "left"
).select(
    F.col("current.date").alias("date"),
    F.coalesce("current.account", "previous.account").alias("account"),
    F.col("current.balance").alias("balance"),
    (F.col("current.balance") - F.coalesce(F.col("previous.balance"), F.lit(0))).alias("balance_diff_with_previous_month")
).orderBy("date","account").show()

, что приводит к

+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|01.01.2019|      1|     40|                              40|
|01.01.2019|      2|     33|                              33|
|01.01.2019|      3|     31|                              31|
|01.02.2019|      1|     32|                              -8|
|01.02.2019|      2|     56|                              23|
|01.02.2019|      4|     89|                              89|
|01.03.2019|      2|     12|                             -44|
|01.03.2019|      4|     35|                             -54|
|01.03.2019|      5|     76|                              76|
|01.03.2019|      6|     47|                              47|
+----------+-------+-------+--------------------------------+
1 голос
/ 28 января 2020
>>> from pyspark.sql.functions import *
>>> from pyspark.sql import Window
>>> df.show()
+----------+-------+-------+
|      date|account|balance|
+----------+-------+-------+
|01.01.2019|      1|     40|
|01.01.2019|      2|     33|
|01.01.2019|      3|     31|
|01.02.2019|      1|     32|
|01.02.2019|      2|     56|
|01.02.2019|      4|     89|
|01.03.2019|      2|     12|
|01.03.2019|      4|     35|
|01.03.2019|      5|     76|
|01.03.2019|      6|     47|
+----------+-------+-------+

>>> df1 = df.withColumn("date", expr("to_date(date, 'dd.MM.yyyy')"))
>>> W = Window.partitionBy("account").orderBy("date")
>>> df1.withColumn("balance_diff_with_previous_month", col("balance") - lag(col("balance"),1,0).over(W)).show()
+----------+-------+-------+--------------------------------+
|      date|account|balance|balance_diff_with_previous_month|
+----------+-------+-------+--------------------------------+
|2019-01-01|      1|     40|                            40.0|
|2019-01-01|      2|     33|                            33.0|
|2019-01-01|      3|     31|                            31.0|
|2019-02-01|      1|     32|                            -8.0|
|2019-02-01|      2|     56|                            23.0|
|2019-02-01|      4|     89|                            89.0|
|2019-03-01|      2|     12|                           -44.0|
|2019-03-01|      4|     35|                           -54.0|
|2019-03-01|      5|     76|                            76.0|
|2019-03-01|      6|     47|                            47.0|
+----------+-------+-------+--------------------------------+
1 голос
/ 28 января 2020

F.lag отлично работает для того, что вы хотите, если вы разделите на account и

partition = Window.partitionBy("account") \
                  .orderBy(F.col("date").cast("timestamp").cast("long"))

previousAmount = data.withColumn("balance_diff_with_previous_month", F.lag("balance").over(partition))
                     .show(10, False)
...