Обеспечение зависимости функции для распараллеливания через multidplyr на всех узлах кластера - PullRequest
0 голосов
/ 20 февраля 2019

Я написал функцию, которая помогает мне подготовить все узлы кластера, охватываемые multidplyr::get_default_cluster(), для выполнения параллельной задачи.

Это работает, но теперь я пропускаю следующий шаг: любой "из"функциональности "из коробки" в multidplyr (или в друзьях), которая помогает мне определить все функции (и в идеале также пакет) зависимости функции, которую я хочу распараллелить.

В идеале, было быбыть вызовом чего-то identify_dependencies_of("foo"), что вернуло бы мне список с элементами

  • functions (перечисляя все функции, которые вызываются стеком вызовов foo())
  • packages (перечисление всех зависимостей пакетов, используемых в стеке вызовов foo())

В настоящее время выяснение того, какие зависимости действительно необходимы для узлов, полностью "методом проб и ошибок"."и я ищу что-то, что упрощает этот процесс.

Пример

Определение функции подготовки

# devtools::install_github("hadley/multidplyr")
library(multidplyr)
library(magrittr)
#> Warning: package 'magrittr' was built under R version 3.5.2

prepare_cluster_nodes <- function(
  ...,
  funs = Character(),
  pkgs = Character()
) {
  # Capture dots -----
  dots <- rlang::dots_list(...)
  dots_env <- rlang::as_env(dots)

  # Initialize cluster -----
  cl <- multidplyr::get_default_cluster()

  # Variables -----
  dots %>%
    names() %>%
    purrr::map(function(.x) {
      value <- rlang::env_get(dots_env, nm = .x)
      multidplyr::cluster_assign_value(cl, .x, value)
    })

  # Functions -----
  funs %>%
    purrr::map(function(.x) {
      value <- get(.x)
      multidplyr::cluster_assign_value(cl, .x, value)
    })

  # Packages -----
  pkgs %>%
    purrr::map(function(.x) {
      multidplyr::cluster_library(cl, .x)
    })

  cl
}

Определение функций для примерного параллельного вызова

# Function defs -----
foo <- function(x, y) {
  x %>% 
    dplyr::mutate(
      value = bar(value) * y
    )
}

bar <- function(x) x * 1000 

Preразбиение узлов

# Prepare nodes -----
y <- -1

cl <- prepare_cluster_nodes(
  y = y, 
  funs = c("foo", "bar"), 
  pkgs = c("dplyr", "ggplot2")
)
#> Initialising 3 core cluster.
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

cl %>% multidplyr::cluster_ls()
#> [[1]]
#> [1] "bar" "foo" "y"  
#> 
#> [[2]]
#> [1] "bar" "foo" "y"  
#> 
#> [[3]]
#> [1] "bar" "foo" "y"

Распараллеленный вызов

# Parallelized call -----
df <- tibble::tibble(id = 1:10000, value = 1:10000)
df %>% 
  multidplyr::partition(id) %>% 
  dplyr::do(foo(., y = y))
#> Warning: group_indices_.grouped_df ignores extra arguments
#> Source: party_df [10,000 x 2]
#> Groups: id
#> Shards: 3 [3,333--3,334 rows]
#> 
#> # Description: S3: party_df
#>       id  value
#>    <int>  <dbl>
#>  1     6  -6000
#>  2     8  -8000
#>  3     9  -9000
#>  4    12 -12000
#>  5    13 -13000
#>  6    16 -16000
#>  7    18 -18000
#>  8    28 -28000
#>  9    33 -33000
#> 10    37 -37000
#> # ... with 9,990 more rows

Создано в 2019-02-20 пакетом представлением (v0.2.1)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...