Обработка ошибок для стратегии разделения-применения-объединения в Sparklyr - PullRequest
0 голосов
/ 03 июня 2019

У меня есть Spark DataFrame со столбцом идентификатора под названием «ID пользователя», которым я манипулирую, используя sparklyr. Каждый userid может содержать от одной строки данных до сотен строк данных. Я применяю функцию к каждой группе userid, которая сокращает количество содержащихся в ней строк на основе определенных критериев события. Что-то вроде

sdf %>%
  group_by(userid) %>%
  ... %>%   # using dplyr::filter and dplyr::mutate
  ungroup()

Я хотел бы обернуть эту функцию в обработчике ошибок, таком как purrr::possibly, чтобы вычисления не прерывались, если в одной группе происходит ошибка.

Пока что я добился наибольшего успеха, используя пакет replyr . В частности, replyr::gapply «разделение по значениям в столбце группировки, применяет универсальное преобразование к каждой группе и затем связывает группы вместе». Существует два способа разделения данных: «group_by» и «extract». Авторы рекомендуют использовать «extract» только в том случае, если количество групп составляет 100 или меньше, но метод «group_by» не работает так, как я ожидал:

library(sparklyr)
library(dplyr) 
library(replyr)   # replyr::gapply
library(purrr)    # purrr::possibly

sc <- spark_connect(master = "local")

# Create a test data frame to use gapply on.
test_spark <- tibble(
  userid = c(1, 1, 2, 2, 3, 3),
  occurred_at = seq(1, 6)
) %>%
  sdf_copy_to(sc, ., "test_spark")

# Create a data frame that purrr::possibly should return in case of error.
default_spark <- tibble(userid = -1, max = -1, min = -1) %>%
  sdf_copy_to(sc, ., "default_spark")

#####################################################
# Method 1: gapply with partitionMethod = "group_by".
#####################################################

# Create a function which may throw an error. The group column, userid, is not 
# included since gapply( , partitionMethod = "group_by") creates it.
# - A print statement is included to show that when gapply uses "group_by", the 
# function is only called once.

fun_for_groups <- function(sdf) {
  temp <- sample(c(1,2), 1)
  print(temp)
  if (temp == 2) {
    log("a")
  } else {
    sdf %>%
      summarise(max = max(occurred_at),
                min = min(occurred_at))
  }
}

# Wrap the risk function to try and handle the error gracefully.

safe_for_groups <- purrr::possibly(fun_for_groups, otherwise = default_spark)

# Apply the safe function to each userid using gapply and "group_by".
# - The result is either a) only the default_spark data frame.
#                        b) the result expected if no error occurs in fun_for_groups.
#   I would expect the answer to have a mixture of default_spark rows and correct rows.

replyr::gapply(
  test_spark, 
  gcolumn = "userid", 
  f = safe_for_groups, 
  partitionMethod = "group_by"
)

#####################################################
# Method 2: gapply with partitionMethod = "extract".
#####################################################

# Create a function which may throw an error. The group column, userid, is 
# included since gapply( , partiionMethod = "extract") doesn't create it.
# - Include a print statement to show that when gapply uses partitionMethod 
#   "split", the function is called for each userid.

fun_for_extract <- function(df) {
  temp <- sample(c(1,2), 1)
  print(temp)
  if (temp == 2) {
    log("a")
  } else {
    df %>%
      summarise(max = max(occurred_at), 
                min = min(occurred_at),
                userid = min(userid))
  }
}

safe_for_extract <- purrr::possibly(fun_for_extract, otherwise = default_spark)

# Apply that function to each userid using gapply and "split".
# - The result dataframe has a mixture of "otherwise" rows and correct rows.

replyr::gapply(
  test_spark, 
  gcolumn = "userid", 
  f = safe_for_extract, 
  partitionMethod = "extract"
)

Насколько плоха идея использовать gapply, когда столбец группировки имеет миллионы значений? Есть ли альтернатива стратегиям обработки ошибок, представленным выше?

1 Ответ

0 голосов
/ 04 июня 2019

replyr::gapply() - это просто тонкая оболочка поверх dplyr (и в данном случае sparklyr).

Для сгруппированного режима - результат может быть верным только в том случае, если нет групповых ошибок, так как расчет выполняется одновременно. Это наиболее эффективный режим, но на самом деле не удается добиться какой-либо обработки ошибок.

Для режима извлечения возможно добавить обработку ошибок, но в текущем коде ее нет.

Как автор replyr, я бы на самом деле предложил изучить sparklyr spark_apply() метод. Gapply replyr был разработан, когда spark_apply() был недоступен в sparklyr (а также когда списки привязки данных также не были доступны в sparklyr).

Также replyr в основном находится в «режиме обслуживания» (проблемы с исправлениями для клиентов, которые использовали его в более крупных проектах) и, вероятно, не является хорошим выбором для новых проектов.

...