Вы можете использовать collect_list
или collect_set
:
set.seed(1)
df <- copy_to(
sc, tibble(group = rep(c("a", "b"), 3), value = runif(6)),
name = "df"
)
result <- df %>% group_by(group) %>% summarise(values = collect_list(value))
result
# Source: lazy query [?? x 2]
# Database: spark_connection
group values
<chr> <list>
1 b <list [3]>
2 a <list [3]>
, что переводится в следующеезапрос:
result %>% show_query()
<SQL>
SELECT `group`, COLLECT_LIST(`value`) AS `values`
FROM `df`
GROUP BY `group`
с соответствующим планом выполнения :
result %>% optimizedPlan()
<jobj[213]>
org.apache.spark.sql.catalyst.plans.logical.Aggregate
Aggregate [group#259], [group#259, collect_list(value#260, 0, 0) AS values#345]
+- InMemoryRelation [group#259, value#260], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `df`
+- Scan ExistingRDD[group#259,value#260]
и схемой (со столбцом array<...>
):
root
|-- group: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: double (containsNull = true)
Пожалуйста, имейте в виду, что:
- Операции подобного типа в распределенной системе очень дороги.
- В зависимости от распределения данных может оказаться невозможным.
- Сложные типы довольно сложно обрабатывать в Spark в целом, а
sparklyr
с аккуратным фокусом данных не облегчает работу.Для эффективной обработки результата вам может потребоваться расширение Scala.