Я хочу использовать SparkR / sparklyr, чтобы сначала сделать несколько обработок данных (+ другие функции, зависящие от разных библиотек R) в Databricks.Основная цель - применить функцию к каждому столбцу исходного кадра данных и записать ее в хранилище.Единственное решение, которое я нашел до сих пор для выполнения этой работы, - это spark.apply из SparkR.
В функции writer
ниже могут вызываться другие функции из различных пакетов R или пользовательские функции.
library(SparkR)
library(sparklyr)
library(dplyr)
df1 <- createDataFrame(data.frame(rnorm(1000, 10, 5),rnorm(1000, 10, 5),rnorm(1000, 10, 5)))
inp <- c("COL_1", "COL_2", "COL_3")
names(df1) <- inp
writer <- function(x) {
df2 <- df1 %>%
select(x) %>%
rename(INPUT=x) %>%
#Adding additional columns is just a placeholder for other transformations taking place
mutate(add1=INPUT,
add2=INPUT,
add3=INPUT)
spark_write_parquet(df2, path=paste0(paste0("home/", x), "/"), mode="overwrite")
}
spark.apply(inp, writer)
В конце концов, я ожидаю, что 3 кадра данных будут записаны в определенный каталог как файлы Parquet.К сожалению, я получаю следующие ошибки, которые указывают на то, что пакеты, доступные в глобальной среде, недоступны на рабочих узлах.Я пробовал разные попытки решения (установка и загрузка пакетов отдельно на узлах, как описано здесь или установка и загрузка их в функции).
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 14.0 failed 4 times, most recent failure: Lost task 2.3 in stage 14.0 (TID 44, 10.139.64.9, executor 6): org.apache.spark.SparkException: R computation failed with
Error in df1 %>% select(x) %>% rename(INPUT = x) %>% mutate(add1 = INPUT, :
could not find function "%>%"
Calls: compute -> computeFunc -> lapply -> lapply -> FUN
Execution halted
at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:51)
[...]
В качестве альтернативы также появилось это сообщение:
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 12.0 failed 4 times, most recent failure: Lost task 6.3 in stage 12.0 (TID 134, 10.139.64.12, executor 6): org.apache.spark.SparkException: R computation failed with
Warning: namespace ‘sparklyr’ is not available and has been replaced
by .GlobalEnv when processing object ‘’
Error in df1 %>% select(x) %>% rename(INPUT = x) %>% mutate(add1 = INPUT, :
could not find function "%>%"
Calls: compute -> computeFunc -> lapply -> lapply -> FUN
[...]
К сожалению, пока ни одна из моих идей не сработала ... Надеюсь на помощь, спасибо!