Если ваш спор достаточно прост, вы можете сделать это с помощью SQL в конвейере через ft_sql_transformer
.Например, если вы хотите добавить изменение столбца в конвейер, вы можете сделать:
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = df
) %>%
ft_sql_transformer(
"select *, distance + 47 as example from __THIS__") %>%
ft_binarizer(
input_col = "dep_delay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "sched_dep_time",
output_col = "hours",
splits = c(400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ month + day + hours + distance) %>%
ml_logistic_regression()
Есть некоторые ограничения того типа кода SQL, который вы можете запустить, но яНадеюсь, что это работает для вас.Вот полный пример, который я протестировал.Обратите внимание на измененный столбец в финальной таблице.
library(nycflights13)
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", spark_version = "2.2.0")
## * Using Spark: 2.2.0
spark_flights <- sdf_copy_to(sc, flights)
df <- spark_flights %>%
filter(!is.na(dep_delay)) %>%
mutate(
month = paste0("m", month),
day = paste0("d", day)
) %>%
select(dep_delay, sched_dep_time, month, day, distance)
ft_dplyr_transformer(sc, df)
ft_dplyr_transformer(sc, df) %>%
ml_param("statement")
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = df
) %>%
ft_sql_transformer(
"select *, distance + 47 as example from __THIS__") %>%
ft_binarizer(
input_col = "dep_delay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "sched_dep_time",
output_col = "hours",
splits = c(400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ month + day + hours + distance) %>%
ml_logistic_regression()
flights_pipeline
partitioned_flights <- sdf_partition(
spark_flights,
training = 0.01,
testing = 0.01,
rest = 0.98
)
fitted_pipeline <- ml_fit(
flights_pipeline,
partitioned_flights$training
)
fitted_pipeline
predictions <- ml_transform(
fitted_pipeline,
partitioned_flights$testing
)