Как я могу обучить случайный лес с разреженной матрицей в Spark? - PullRequest
0 голосов
/ 05 июня 2018

Рассмотрим этот простой пример, в котором используется sparklyr:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

Размер кадра данных достаточно мал (около 70k строк и 14k уникальных слов).

ТеперьОбучение модели naive bayes занимает всего несколько секунд на моем кластере.Сначала я определяю pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

, затем обучаем naive bayes модель

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

Теперь проблема заключается в том, что попытка запустить любую модель на основе tree (random forest, boosted trees и т. Д.) Для одного (на самом деле крошечного!) Набора данных не будет работать.

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

Ошибка: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 69.0 не выполнено 4 раза, последний сбой: потерянное задание 0.3 на этапе 69.0 (TID1580, 1.1.1.1.1, исполнитель 5): java.lang.IllegalArgumentException: размер превышает Integer.MAX_VALUE

Я думаю, что это связано с разреженностью матричного представления токенов, ноЕсть что-нибудь, что можно сделать здесь?Это проблема sparklyr?spark проблема?Мой код неэффективен?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 08 июня 2018

Вы получаете эту ошибку, потому что вы фактически достигли знаменитого предела 2G, который есть у нас в Spark https://issues.apache.org/jira/browse/SPARK-6235

Решение состоит в том, чтобы перераспределить ваши данные перед подачей их в алгоритм.

Это на самом деле две ошибки в этом посте:

  • Работа с локальными данными.
  • Древовидные модели в Spark требуют много памяти.

Итак, давайте рассмотрим ваш код, который кажется безвредным;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

Итак, что делает последняя строка?

copy_to (не предназначен для больших наборов данных), фактически просто копирует локальный фрейм данных R в 1-секционный Spark DataFrame

Так что вам просто нужно перераспределить ваши данные, чтобы сделатьУбедитесь, что после того, как конвейер подготовит ваши данные перед подачей в gbt, размер раздела будет меньше 2 ГБ.

Таким образом, вы можете просто сделать следующее, чтобы перераспределить ваши данные:

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <- 
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% 
 sdf_repartition(partitions = 20)

PS1: max_memory_in_mb - это объем памяти, который вы предоставляете для gbtвычисляет это статистика.Это не связано напрямую с количеством вводимых данных.

PS2: Если вы не установили достаточно памяти для своих исполнителей, вы можете столкнуться с java.lang.OutOfMemoryError : GC overhead limit exceeded

РЕДАКТИРОВАТЬ: Что означает перераспределение данных?

Мы всегда можем обратиться к определению того, что такое раздел, прежде чем говорить о перераспределении.Я постараюсь быть коротким.

Раздел - это логический фрагмент большого распределенного набора данных.

Spark управляет данными с использованием разделов, что помогает распараллеливать распределенную обработку данных с минимальным сетевым трафиком для отправки данных между исполнителями.По умолчанию Spark пытается считывать данные в RDD с узлов, которые находятся рядом с ним.Поскольку Spark обычно обращается к распределенным многораздельным данным, для оптимизации операций преобразования он создает разделы для хранения фрагментов данных.

Увеличение числа разделов приведет к тому, что в каждом разделе будет меньше данных (или не будет вообще!)

источник: выдержка из @JacekLaskowski Освоение книги Apache Spark .

Но разделы данных не всегда правильны, как в этом случае.Так что передел нужен.(sdf_repartition для sparklyr)

sdf_repartition будет разбрасывать и перетасовывать ваши данные по вашим узлам.то есть sdf_repartition(20) создаст 20 разделов ваших данных вместо 1, который у вас изначально был в этом случае.

Надеюсь, это поможет.

Весь код:

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for 
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') 

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7> 
#   Stages 
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b> 
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489> 
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list> 
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213> 
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ... 
# |      num_classes:  int 2 
# |      num_features:  int 39158 
# |      total_num_nodes:  int 540 
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ... 
# |      trees: <list> 
0 голосов
/ 08 июня 2018

Не могли бы вы предоставить полную трассировку ошибок?

Я предполагаю, что у вас недостаточно памяти.Случайные леса и gbt-деревья представляют собой ансамблевые модели, поэтому им требуется больше памяти и вычислительных ресурсов, чем наивных байесовских.

Попробуйте перераспределить данные (для начала лучше всего использовать значение spark.sparkContext.defaultParallelism), чтобы каждый из ваших работников получил меньший и более равномерно распределенный фрагмент.

Если это не сработает, попробуйте уменьшить параметр max_memory_in_mb до 256.

...