Вы получаете эту ошибку, потому что вы фактически достигли знаменитого предела 2G, который есть у нас в Spark https://issues.apache.org/jira/browse/SPARK-6235
Решение состоит в том, чтобы перераспределить ваши данные перед подачей их в алгоритм.
Это на самом деле две ошибки в этом посте:
- Работа с локальными данными.
- Древовидные модели в Spark требуют много памяти.
Итак, давайте рассмотрим ваш код, который кажется безвредным;
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
Итак, что делает последняя строка?
copy_to
(не предназначен для больших наборов данных), фактически просто копирует локальный фрейм данных R в 1-секционный Spark DataFrame
Так что вам просто нужно перераспределить ваши данные, чтобы сделатьУбедитесь, что после того, как конвейер подготовит ваши данные перед подачей в gbt
, размер раздела будет меньше 2 ГБ.
Таким образом, вы можете просто сделать следующее, чтобы перераспределить ваши данные:
# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)
PS1: max_memory_in_mb
- это объем памяти, который вы предоставляете для gbt
вычисляет это статистика.Это не связано напрямую с количеством вводимых данных.
PS2: Если вы не установили достаточно памяти для своих исполнителей, вы можете столкнуться с java.lang.OutOfMemoryError : GC overhead limit exceeded
РЕДАКТИРОВАТЬ: Что означает перераспределение данных?
Мы всегда можем обратиться к определению того, что такое раздел, прежде чем говорить о перераспределении.Я постараюсь быть коротким.
Раздел - это логический фрагмент большого распределенного набора данных.
Spark управляет данными с использованием разделов, что помогает распараллеливать распределенную обработку данных с минимальным сетевым трафиком для отправки данных между исполнителями.По умолчанию Spark пытается считывать данные в RDD с узлов, которые находятся рядом с ним.Поскольку Spark обычно обращается к распределенным многораздельным данным, для оптимизации операций преобразования он создает разделы для хранения фрагментов данных.
Увеличение числа разделов приведет к тому, что в каждом разделе будет меньше данных (или не будет вообще!)
источник: выдержка из @JacekLaskowski Освоение книги Apache Spark .
Но разделы данных не всегда правильны, как в этом случае.Так что передел нужен.(sdf_repartition
для sparklyr
)
sdf_repartition
будет разбрасывать и перетасовывать ваши данные по вашим узлам.то есть sdf_repartition(20)
создаст 20 разделов ваших данных вместо 1, который у вас изначально был в этом случае.
Надеюсь, это поможет.
Весь код:
library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240, # this is amount of data that can be use for
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
pipeline3 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')
model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
# Stages
# |--1 RegexTokenizer (Transformer)
# | <regex_tokenizer_1ce4342b543b>
# | (Parameters -- Column Names)
# | input_col: text
# | output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# | <count_vectorizer_1ce4e0e6489>
# | (Parameters -- Column Names)
# | input_col: mytoken
# | output_col: finaltoken
# | (Transformer Info)
# | vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# | <gbt_classifier_1ce41ab30213>
# | (Parameters -- Column Names)
# | features_col: finaltoken
# | label_col: label
# | prediction_col: pcol
# | probability_col: prcol
# | raw_prediction_col: rpcol
# | (Transformer Info)
# | feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# | num_classes: int 2
# | num_features: int 39158
# | total_num_nodes: int 540
# | tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# | trees: <list>