Pyspark - объединяет последовательные повторяющиеся строки, но поддерживает даты начала и окончания - PullRequest
0 голосов
/ 30 сентября 2019

У меня есть кадр данных в следующем формате ...

id , name, start_date, end_date  , active
1  , albert   , 2019-08-14, 3499-12-31, 1
1  , albert   , 2019-08-13, 2019-08-14, 0
1  , albert   , 2019-06-26, 2019-08-13, 0
1  , brian   , 2018-01-17, 2019-06-26, 0
1  , brian   , 2017-07-31, 2018-01-17, 0
1  , albert   , 2017-03-31, 2018-07-31, 0
2  , diane   , 2019-07-14, 3499-12-31, 1
2  , diane   , 2019-06-13, 2019-07-14, 0
2  , ethel   , 2019-03-20, 2019-06-13, 0
2  , ethel  , 2018-01-17, 2019-03-20, 0
2  , frank   , 2017-07-31, 2018-01-17, 0
2  , frank   , 2015-03-21, 2018-07-31, 0

И я хочу объединить последовательные строки, где имя совпадает с предыдущей строкой, но сохранить правильные даты начала и окончания вокончательный вывод данных. Таким образом, правильный вывод будет ...

id , name, start_date, end_date  , active
1  , albert   , 2019-06-26, 3499-12-31, 1
1  , brian   , 2017-07-31, 2019-06-26, 0
1  , albert   , 2017-03-31, 2018-07-31, 0
2  , diane   , 2019-06-13, 3499-12-31, 1
2  , ethel   , 2018-01-17, 2019-06-13, 0
2  , frank   , 2017-03-31, 2018-01-17, 0

Количество записей для идентификатора меняется, как и количество различных имен для идентификатора.

Как этого можно достичь в pyspark? Спасибо

Ответы [ 2 ]

0 голосов
/ 01 октября 2019

Итак, немного подумав, я понял, как это сделать. Возможно, есть лучший способ, но это работает.

Сначала создайте окно, разделенное по id и упорядоченное по start_date, и захватите следующую строку.

frame = Window.partitionBy('id').orderBy(col('start_date').desc())
df = df.select('*', lag(col('name'), default=0).over(frame).alias('next_name'))

Затем, если текущее имя строкии следующие имена совпадают с набором 0, иначе набором 1 ...

df = df.withColumn('countrr', when(col('name') == col('next_name'), 0).otherwise(1))

Затем создайте расширение фрейма, чтобы взять строки между началом окна и текущей строкой, и суммируйте число col дляframe ...

frame2 = Window.partitionBy('id').orderBy(col('start_date').desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn('sumrr', sum('countrr').over(frame2)

Эффективно создает столбец, который увеличивается на единицу при изменении имени. Наконец, вы можете использовать этот новый столбец sumrr и другие столбцы, чтобы сгруппировать и взять максимальные и минимальные даты, как требуется ...

gb_df = df.groupby(['id', 'name', 'sumrr'])
result = gb_df.agg({'start_date':'min', 'end_date':'max'})

Затем вы должны присоединить активный флаг по id, имени иend_date.

Дает правильный вывод ...

0 голосов
/ 30 сентября 2019

Вы ищете df.groupby(["name", "start_date", "end_date"]).sum("active")?

Если я правильно понял ваши вопросы, вышеуказанный код сработает.

...