Этот вопрос по сути дублирует этот вопрос , за исключением того, что я работаю в R. Решение pyspark выглядит солидно, но я не смог выяснить, как применить collect_list
поверх оконная функция точно так же в sparklyr.
У меня есть Spark DataFrame со следующей структурой:
------------------------------
userid | date | city
------------------------------
1 | 2018-08-02 | A
1 | 2018-08-03 | B
1 | 2018-08-04 | C
2 | 2018-08-17 | G
2 | 2018-08-20 | E
2 | 2018-08-23 | F
Я пытаюсь сгруппировать DataFrame по userid
, упорядочить каждую группу по date
и свернуть столбец city
в объединение его значений. Желаемый вывод:
------------------
userid | cities
------------------
1 | A, B, C
2 | G, E, F
Проблема в том, что каждый метод, с которым я пытался это сделать, приводил к тому, что некоторые пользователи (около 3% при тестировании 5000 пользователей) не имели столбец «города» в правильном порядке.
Попытка 1: использование dplyr
и collect_list
.
my_sdf %>%
dplyr::group_by(userid) %>%
dplyr::arrange(date) %>%
dplyr::summarise(cities = paste(collect_list(city), sep = ", ")))
Попытка 2: использование replyr::gapply
, так как операция соответствует описанию «Группировка-заказ-применение».
get_cities <- . %>%
summarise(cities = paste(collect_list(city), sep = ", "))
my_sdf %>%
replyr::gapply(gcolumn = "userid",
f = get_cities,
ocolumn = "date",
partitionMethod = "group_by")
Попытка 3: написать как оконную функцию SQL.
my_sdf %>%
spark_session(sc) %>%
sparklyr::invoke("sql",
"SELECT userid, CONCAT_WS(', ', collect_list(city)) AS cities
OVER (PARTITION BY userid
ORDER BY date)
FROM my_sdf") %>%
sparklyr::sdf_register() %>%
sparklyr::sdf_copy_to(sc, ., "my_sdf", overwrite = T)
^ выдает следующую ошибку:
Error: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'OVER' expecting <EOF>(line 2, pos 19)
== SQL ==
SELECT userid, conversion_location, CONCAT_WS(' > ', collect_list(channel)) AS path
OVER (PARTITION BY userid, conversion_location
-------------------^^^
ORDER BY occurred_at)
FROM paths_model