Как добавить пользовательские функции в конвейеры sparklyr - PullRequest
0 голосов
/ 19 октября 2018

Этот пример взят из документации по sparklyr

https://spark.rstudio.com/guides/pipelines/

flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = df
   ) %>%
  ft_binarizer(
    input.col = "dep_delay",
    output.col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input.col = "sched_dep_time",
    output.col = "hours",
    splits = c(400, 800, 1200, 1600, 2000, 2400)
  )  %>%
  ft_r_formula(delayed ~ month + day + hours + distance) %>% 
  ml_logistic_regression()

Из приведенного выше примера ясно, что конвейер является линейным и использует встроенные преобразования sparklyrи только функции dplyr для манипулирования данными.

Есть ли способ, которым у меня может быть собственный преобразователь (например: наличие цикла for в пользовательской определенной функции) в конвейере sparklyr?

1 Ответ

0 голосов
/ 20 октября 2018

Если ваш спор достаточно прост, вы можете сделать это с помощью SQL в конвейере через ft_sql_transformer.Например, если вы хотите добавить изменение столбца в конвейер, вы можете сделать:

flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = df
  ) %>%
  ft_sql_transformer(
    "select *,  distance + 47 as example from __THIS__") %>%
  ft_binarizer(
    input_col = "dep_delay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col = "hours",
    splits = c(400, 800, 1200, 1600, 2000, 2400)
  )  %>%
  ft_r_formula(delayed ~ month + day + hours + distance) %>% 
  ml_logistic_regression()

Есть некоторые ограничения того типа кода SQL, который вы можете запустить, но яНадеюсь, что это работает для вас.Вот полный пример, который я протестировал.Обратите внимание на измененный столбец в финальной таблице.

library(nycflights13)
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", spark_version = "2.2.0")

## * Using Spark: 2.2.0

spark_flights <- sdf_copy_to(sc, flights)


df <- spark_flights %>%
  filter(!is.na(dep_delay)) %>%
  mutate(
    month = paste0("m", month),
    day = paste0("d", day)
  ) %>%
  select(dep_delay, sched_dep_time, month, day, distance)


ft_dplyr_transformer(sc, df)


ft_dplyr_transformer(sc, df) %>%
  ml_param("statement")


flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = df
  ) %>%
  ft_sql_transformer(
    "select *,  distance + 47 as example from __THIS__") %>%
  ft_binarizer(
    input_col = "dep_delay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "sched_dep_time",
    output_col = "hours",
    splits = c(400, 800, 1200, 1600, 2000, 2400)
  )  %>%
  ft_r_formula(delayed ~ month + day + hours + distance) %>% 
  ml_logistic_regression()


flights_pipeline


partitioned_flights <- sdf_partition(
  spark_flights,
  training = 0.01,
  testing = 0.01,
  rest = 0.98
)

fitted_pipeline <- ml_fit(
  flights_pipeline,
  partitioned_flights$training
)

fitted_pipeline

predictions <- ml_transform(
  fitted_pipeline,
  partitioned_flights$testing
)
...