В настоящее время я пытаюсь понять процессы вычислений Spark и влияние на потребление памяти.
Я работаю со Spark 2.3.2 и Python 2.7 в Zeppelin.
В основномв следующем цикле я создаю наборы.Я строю модель машинного обучения с помощью sci-kit-learn и после вычислений sci-kit-learn я выполняю много операций над кадрами данных на pyspark-dataframes.И для каждого я получаю таблицу rsmeMaeStep с 8 строками и 10 столбцами с небольшими строковыми или двойными значениями.RsmeMaeAll просто добавляет один анализ и имеет 8 * 26 = 208 строк с 10 столбцами для i = 26.
for i in range(26):
df_features_train, df_features_validation = randomizer(dataFiltered)
rsmeMaeStep, rsmeMaeAll = rsmeMaeAnalysis(rsmeMaeAll,df_features_train,df_features_test)
print(i)
Я провел некоторый анализ времени для кода.Для i = 1 потребовалось 17 секунд для i = 10: 2:40, для i = 26 - 6:42.(т.е. в 9,4 или 23,6 раза дольше для 10 или 26 циклов.) Пока все как положено.У меня проблема на следующем шаге.Следующий код должен просто агрегировать от 8 до 206 строк.Для i = 1 это равно 32 секундам, для i = 7 4:43 (в 8,8 раза дольше), но для i = 26 у меня 0% через 47 минут, или происходит сбой с сообщением о нехватке памяти.
rsmeMae = rsmeMaeAll.select('set','setting','sme').orderBy('set','setting')
import pyspark.sql.functions as f
rsmeMaeAverage = rsmeMae.groupBy('setting','set').agg(f.count(('setting')).alias('nrOfRand'), f.round(f.mean('sme'),2).alias('rsme'),f.round(f.stddev('sme'),2).alias('sigmaRsme')).orderBy('set','setting')
z.show(rsmeMaeAverage)
Исходя из логики, которую я думал, все таблицы должны быть перезаписаны в каждом цикле.Только маленький rsmeMaeAll должен немного увеличиваться с каждым циклом.Но это все еще очень маленький стол.
Но Спарк, вероятно, действует по-другому.
Насколько я понимаю, код sk-learn первого шага выполняется на первом шаге.Если я правильно понимаю оценку с отложенным вычислением, то операции pySpark в моем коде начинают выполняться, когда я хочу напечатать результаты.Поэтому Spark, возможно, сохраняет все таблицы всех циклов в памяти.Это правильно?
Если я прав, мне понадобится код для вычисления кода pySpark непосредственно в конце каждого цикла.
Как я могу это сделать?
И если я это сделаю, это приведет к перезаписи таблиц в следующем цикле или потребление памяти все равно будет расти с каждым циклом?Нужно ли мне активно удалять таблицы из памяти и как?
edit: Я только что интегрировал
rsmeMaeStep.collect()
rsmeMaeAll.collect()
в цикл, чтобы убедиться, что pysparkрасчет делается сразу.Но пока первый цикл занял 55 секунд.7-е заняло более 10 минут, и через 49 минут оно перешло в rsmeMaeAll.collect () 8-го цикла.С сообщением об ошибке:
Py4JJavaError: Произошла ошибка при вызове o13488.collectToPython.: java.lang.OutOfMemoryError: Пространство кучи Java
Я действительно не понимаю экспоненциальный рост времени за цикл.И раньше я хотя бы смог запустить 10 петель.Что там происходит?