как добавить идентификатор встречи, сравнив значение столбца с помощью pyspark - PullRequest
0 голосов
/ 06 августа 2020

В моем CSV есть 2 столбца, отдел, дата. Я хочу добавить новый столбец с именем id. состояние:

идентификатор по умолчанию будет 1. , если текущий отдел == следующий отдел и разница дат больше 2 дней, идентификатор увеличится. если текущий отдел == следующий отдел и дата разница меньше 2 дней. id будет таким же, как и предыдущий. если текущий отдел! = следующий отдел, то начать с id = 1. Я сделал это, используя pandas с итератором строк, как это сделать это с использованием pyspark (500000 строк).

Ожидаемый результат:

+------+------------+----+
| dept |    date    | id |
+------+------------+----+
|    1 | 04/10/2018 |  1 |
|    1 | 27/11/2018 |  2 |
|    1 | 27/11/2018 |  2 |
|    1 | 27/11/2018 |  2 |
|    1 | 27/12/2018 |  3 |
|    1 | 27/01/2019 |  4 |
|    1 | 27/02/2019 |  5 |
|    1 | 27/03/2019 |  6 |
|    1 | 27/04/2019 |  7 |
|    1 | 27/05/2019 |  8 |
|    2 | 28/12/2018 |  1 |
|    2 | 28/12/2018 |  1 |
|    2 | 28/12/2018 |  1 |
|    2 | 09/01/2019 |  2 |
|    2 | 09/01/2019 |  2 |
|    2 | 15/02/2019 |  3 |
|    2 | 15/02/2019 |  3 |
|    2 | 15/02/2019 |  3 |
|    2 | 28/02/2019 |  4 |
|    2 | 28/02/2019 |  4 |
|    2 | 02/04/2019 |  5 |
|    2 | 08/04/2019 |  6 |
|    2 | 08/04/2019 |  6 |
|    2 | 08/04/2019 |  6 |
|    2 | 09/04/2019 |  6 |
|    2 | 10/04/2019 |  6 |
|    2 | 10/04/2019 |  6 |
|    2 | 29/04/2019 |  7 |
|    2 | 06/02/2019 |  8 |
|    2 | 06/02/2019 |  8 |
|    2 | 06/02/2019 |  8 |
|    2 | 06/02/2019 |  8 |
|    2 | 06/02/2019 |  8 |
|    2 | 20/09/2018 |  9 |
|    2 | 20/09/2018 |  9 |
|    2 | 05/10/2018 | 10 |
|    2 | 05/10/2018 | 10 |
|    2 | 22/03/2019 | 11 |
|    2 | 22/03/2019 | 11 |
|    2 | 17/05/2019 | 12 |
|    3 | 20/09/2018 |  1 |
|    3 | 20/09/2018 |  1 |
|    3 | 20/09/2018 |  1 |
|    3 | 12/10/2018 |  2 |
|    3 | 12/10/2018 |  2 |
|    3 | 09/11/2018 |  3 |
|    3 | 20/12/2018 |  4 |
|    3 | 22/03/2019 |  5 |
|    3 | 22/03/2019 |  5 |
|    3 | 09/04/2019 |  6 |
+------+------------+----+

1 Ответ

1 голос
/ 06 августа 2020

Мне пришлось немного обновить ваши данные, потому что ваши даты не совсем упорядочены в вашем примере.

Использование задержки с кумулятивной суммой должно помочь в вашем варианте использования,

Вот рабочая версия с PySpark:

df = spark.createDataFrame(
    [
    (1, "04/10/2018", 1),
    (1, "27/11/2018", 2),
    (1, "27/11/2018", 2),
    (1, "27/11/2018", 2),
    (1, "27/12/2018", 3),
    (1, "27/01/2019", 4),
    (1, "27/02/2019", 5),
    (1, "27/03/2019", 6),
    (1, "27/04/2019", 7),
    (1, "27/05/2019", 8),
    (2, "28/12/2018", 1),
    (2, "28/12/2018", 1),
    (2, "28/12/2018", 1),
    (2, "09/01/2019", 2),
    (2, "09/01/2019", 2),
    (2, "15/02/2019", 3),
    (2, "15/02/2019", 3),
    (2, "15/02/2019", 3),
    (2, "28/02/2019", 4),
    (2, "28/02/2019", 4),
    (2, "02/04/2019", 5),
    (2, "08/04/2019", 6),
    (2, "08/04/2019", 6),
    (2, "08/04/2019", 6),
    (2, "09/04/2019", 6),
    (2, "10/04/2019", 6),
    (2, "10/04/2019", 6),
    (2, "29/04/2019", 7),
    (2, "06/05/2019", 8),
    (2, "06/05/2019", 8),
    (2, "06/05/2019", 8),
    (2, "06/05/2019", 8),
    (2, "06/05/2019", 8),
    (2, "20/09/2019", 9),
    (2, "20/09/2019", 9),
    (2, "05/10/2019", 10),
    (2, "05/10/2019", 10),
    (2, "22/03/2020", 11),
    (2, "22/03/2020", 11),
    (2, "17/05/2020", 12),
    (3, "20/09/2018", 1),
    (3, "20/09/2018", 1),
    (3, "20/09/2018", 1),
    (3, "12/10/2018", 2),
    (3, "12/10/2018", 2),
    (3, "09/11/2018", 3),
    (3, "20/12/2018", 4),
    (3, "22/03/2019", 5),
    (3, "22/03/2019", 5),
    (3, "09/04/2019", 6),
    ],
    ['dept', 'date', 'target']
)
from pyspark.sql.functions import col, to_timestamp, when, coalesce, lit, datediff, lag, sum
from pyspark.sql import Window

window = Window.partitionBy("dept").orderBy("date_parsed")
window_cusum = (
    Window
    .partitionBy('dept')
    .orderBy('date_parsed')
    .rangeBetween(Window.unboundedPreceding, 0)
)

final_df = (
    df
    .withColumn('date_parsed', to_timestamp(col('date'), 'dd/MM/yyyy'))
    .withColumn('diff',
        when(
            datediff(col("date_parsed"), lag(col("date_parsed")).over(window)) <= 2,
            True
        ).otherwise(False)
    )
    .withColumn('cusum_of_false',
        sum(
            when(~ col("diff"), lit(1)
            ).otherwise(lit(0))
        ).over(window_cusum)
    )
    .withColumn("check_working", col("target") == col("cusum_of_false"))
)

final_df.orderBy("dept", "date_parsed", "cusum_of_false").show()

row_count = final_df.count()
check_working = final_df.agg(sum(when(col("check_working"), lit(1)).otherwise(lit(0)))).collect()[0][0]

assert row_count == check_working
...