Pyspark.Проблемы с памятьюКак убедиться, что таблицы перезаписаны - PullRequest
0 голосов
/ 09 декабря 2018

В настоящее время я пытаюсь понять процессы вычислений Spark и влияние на потребление памяти.

Я работаю со Spark 2.3.2 и Python 2.7 в Zeppelin.

В основномв следующем цикле я создаю наборы.Я строю модель машинного обучения с помощью sci-kit-learn и после вычислений sci-kit-learn я выполняю много операций над кадрами данных на pyspark-dataframes.И для каждого я получаю таблицу rsmeMaeStep с 8 строками и 10 столбцами с небольшими строковыми или двойными значениями.RsmeMaeAll просто добавляет один анализ и имеет 8 * 26 = 208 строк с 10 столбцами для i = 26.

for i in range(26):
    df_features_train, df_features_validation = randomizer(dataFiltered)
    rsmeMaeStep, rsmeMaeAll = rsmeMaeAnalysis(rsmeMaeAll,df_features_train,df_features_test)
    print(i)

Я провел некоторый анализ времени для кода.Для i = 1 потребовалось 17 секунд для i = 10: 2:40, для i = 26 - 6:42.(т.е. в 9,4 или 23,6 раза дольше для 10 или 26 циклов.) Пока все как положено.У меня проблема на следующем шаге.Следующий код должен просто агрегировать от 8 до 206 строк.Для i = 1 это равно 32 секундам, для i = 7 4:43 (в 8,8 раза дольше), но для i = 26 у меня 0% через 47 минут, или происходит сбой с сообщением о нехватке памяти.

rsmeMae = rsmeMaeAll.select('set','setting','sme').orderBy('set','setting')
import pyspark.sql.functions as f
rsmeMaeAverage = rsmeMae.groupBy('setting','set').agg(f.count(('setting')).alias('nrOfRand'), f.round(f.mean('sme'),2).alias('rsme'),f.round(f.stddev('sme'),2).alias('sigmaRsme')).orderBy('set','setting')
z.show(rsmeMaeAverage)

Исходя из логики, которую я думал, все таблицы должны быть перезаписаны в каждом цикле.Только маленький rsmeMaeAll должен немного увеличиваться с каждым циклом.Но это все еще очень маленький стол.

Но Спарк, вероятно, действует по-другому.

Насколько я понимаю, код sk-learn первого шага выполняется на первом шаге.Если я правильно понимаю оценку с отложенным вычислением, то операции pySpark в моем коде начинают выполняться, когда я хочу напечатать результаты.Поэтому Spark, возможно, сохраняет все таблицы всех циклов в памяти.Это правильно?

Если я прав, мне понадобится код для вычисления кода pySpark непосредственно в конце каждого цикла.

Как я могу это сделать?

И если я это сделаю, это приведет к перезаписи таблиц в следующем цикле или потребление памяти все равно будет расти с каждым циклом?Нужно ли мне активно удалять таблицы из памяти и как?

edit: Я только что интегрировал

rsmeMaeStep.collect()
rsmeMaeAll.collect()

в цикл, чтобы убедиться, что pysparkрасчет делается сразу.Но пока первый цикл занял 55 секунд.7-е заняло более 10 минут, и через 49 минут оно перешло в rsmeMaeAll.collect () 8-го цикла.С сообщением об ошибке:

Py4JJavaError: Произошла ошибка при вызове o13488.collectToPython.: java.lang.OutOfMemoryError: Пространство кучи Java

Я действительно не понимаю экспоненциальный рост времени за цикл.И раньше я хотя бы смог запустить 10 петель.Что там происходит?

1 Ответ

0 голосов
/ 11 декабря 2018

Я думаю, что проблема как-то связана с ленивой оценкой в ​​Spark.И так как я собрал всю информацию, фрейм данных pyspark rsmeMaeAll, возможно, вся информация, необходимая для создания rsmeMaeAll, была загружена в кэш одновременно, пока я пытался вычислить вывод.

Исходя из этой идеи, я перестроил код таким образом, чтобы Spark больше не выполнял все промежуточные шаги.Кроме того, я интегрировал измерение времени и перестроил старый код в двух вариантах, чтобы сделать один вариант ближе к новой логике и каждому варианту так, чтобы вычисление выполнялось в конце каждого цикла.

Решение было следующим:

for i in range(9):
    ti0 = time.time()
    df_features_train, df_features_test = randomizer(dataFiltered)
    rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test)
    rsmeMaeAllpd = rsmeMaeAllpd.append(rsmeMaeStep.toPandas())
    print(rsmeMaeAllpd)
    ti1 = time.time()
    print "Time for loop", i, ":", ti1-ti0

В rsmeMaeAnalysis я только что рассчитал результаты анализа, вернул их, преобразовал их в фрейм данных Pandas и собрал все результаты в Pandas.В результате каждый цикл занимал примерно одно и то же время, и даже после 20 циклов у меня не было проблем с памятью.Время для первых десяти циклов было следующим:

41 с, 42 с, 44 с, 40 с, 43 с, 43 с, 40 с, 39 с, 40 с, 40 с

Нотогда я хотел быть уверен, что сбор результатов в фрейме данных pyspark действительно был проблемой, поэтому я создаю код, максимально приближенный к pandas-решению, но собирая результаты в фрейме данных pyspark:

for i in range(10):
    ti0 = time.time()
    df_features_train, df_features_test = randomizer(dataFiltered)
    rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test)
    rsmeMaeAll = rsmeMaeAll.union(rsmeMaeStep)
    rsmeMaeAll.show(80,False)
    ti1 = time.time()
    print "Time for loop", i, ":", ti1-ti0

Время первых восьми циклов было следующим:

43 с, 63 с, 88 с, 144 с, 162 с, 175 с, 212 с, 276 с

В исходном варианте только с измерением времени до 7-го цикла ошибки памяти не хватало:

44 с, 60 с, 73 с, 98 с, 128 с, 157 с, 198 с

В конце все еще кажется, что ленивая оценка приводит к тому, что много информации, необходимой для создания rsmeMaeAll, было загружено в кэш одновременно, хотя большая часть информации не имеет значения в конце каждого цикла.

...