Используя фрейм данных PySpark, я пытаюсь разработать последовательность для категориальных переменных.Поэтому для каждого идентификатора мне нужно отсортировать по дате и объединить значения столбцов каждого категориального столбца, как показано в выходных данных ниже.Для конкретного идентификатора значение категориальной последовательности должно быть от начала даты-времени до текущей даты-времени затрагиваемой записи, и значения должны быть разделены пробелом для каждой записи.Обратите внимание, что здесь не должно быть потерь записей.
ВХОД:
транзакция_таблица
+----+------------------------------+------+-------+------+
| id | date | cat1 | cat2 | cat3 |
+----+------------------------------+------+-------+------+
| 1 | 2018-01-25 00:00:... | C | Text1 | val1 |
| 1 | 2018-01-25 00:00:... | A | Text1 | val3 |
| 1 | 2018-01-25 00:00:... | B | Text5 | val5 |
| 2 | 2018-01-26 00:00:... | A | Text2 | val1 |
| 2 | 2018-01-26 00:00:... | A | Text1 | val2 |
| 3 | 2018-01-27 00:00:... | C | Text6 | val1 |
| 3 | 2018-01-29 00:00:... | A | Text2 | val9 |
| 3 | 2018-01-29 00:00:... | C | Text6 | val5 |
| 3 | 2018-02-05 00:00:... | A | Text1 | val3 |
+----+------------------------------+------+-------+------+
ВЫХОД:
+----+------------------------------+----------+-------------------------+---------------------+
| id | date | cat1_seq | cat2_seq | cat3_seq |
+----+------------------------------+----------+-------------------------+---------------------+
| 1 | 2018-01-25 00:00:... | C | Text1 | val1 |
| 1 | 2018-01-25 00:00:... | C A | Text1 Text1 | val1 val3 |
| 1 | 2018-01-25 00:00:... | C A B | Text1 Text1 Text5 | val1 val3 val5 |
| 2 | 2018-01-26 00:00:... | A | Text2 | val1 |
| 2 | 2018-01-26 00:00:... | A A | Text2 Text1 | val1 val2 |
| 3 | 2018-01-27 00:00:... | C | Text6 | val1 |
| 3 | 2018-01-29 00:00:... | C A | Text6 Text2 | val1 val9 |
| 3 | 2018-01-29 00:00:... | C A C | Text6 Text2 Text6 | val1 val9 val5 |
| 3 | 2018-02-05 00:00:... | C A C A | Text6 Text2 Text6 Text1 | val1 val9 val5 val3 |
+----+------------------------------+----------+-------------------------+---------------------+
Решено: Я использовал следующий код, предложенный @Pault, чтобы сделать это -
import pyspark.sql.functions as f
from pyspark.sql import Window
df1 = df.select("*", *[f.concat_ws(" ",f.collect_list(c).over(Window.partitionBy("id").orderBy("date"))).alias(c + "_seq") for c in df.columns])