Как повторно использовать контекст sparklyr с mclapply? - PullRequest
0 голосов
/ 16 февраля 2020

У меня есть R-код, который выполняет некоторую предварительную обработку распределенных данных в sparklyr, а затем собирает данные в локальный фрейм данных R, чтобы в итоге сохранить результат в CSV. Все работает как положено, и теперь я планирую повторно использовать контекст spark для обработки нескольких входных файлов.

Мой код выглядит примерно так: *

Однако, если я планирую запустить его параллельно для максимального использования контекста искры (если df0 обработка искры завершена и локальный R работает над этим, df1 уже может быть обработан искрой):

> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)

 *** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n  cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>

[[2]]
NULL

Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 1 encountered error in user code, all values of the job will be affected

Когда я делаю аналогичные действия с Python и ThreadPoolExcutor, контекст искры распределяется между потоками, то же самое для Scala и Java.

Возможно ли повторно использовать контекст sparklyr при параллельном выполнении в R?

1 Ответ

1 голос
/ 17 февраля 2020

Да, к сожалению, объект sc класса spark_connection не может быть экспортирован в другой процесс R (даже если используется разветвленная обработка). Если вы используете пакет future.apply , являющийся частью экосистемы future , вы можете увидеть это, если вы используете:

library(future.apply)
plan(multicore)

## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")

y <- future_lapply(input, f)

То есть вы получите:

Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression
...