У меня есть программа PySpark, которая читает файлы json размером около 350-400 МБ и создает из них фрейм данных.
На следующем шаге я создаю запрос Spark SQL, используя createOrReplaceTempView
и выберите несколько нужных столбцов
Как только это будет сделано, я фильтрую свой фрейм данных с некоторыми условиями. До этого момента все работало нормально.
Теперь мне нужно было удалить некоторые повторяющиеся значения, используя столбец. Итак, я ввел
dropDuplicates
на следующем шаге, и он неожиданно начал давать мне StackoverflowError
Ниже приведен пример кода: -
def create_some_df(initial_df):
initial_df.createOrReplaceTempView('data')
original_df = spark.sql('select c1,c2,c3,c4 from data')
## Filter out some events
original_df = original_df.filter(filter1condition)
original_df = original_df.filter(filter2condition)
original_df = original_df.dropDuplicates(['c1'])
return original_df
Он работал нормально, пока я добавлен метод dropDuplicates. Я использую 3 узла AWS кластер EMR c5.2xlarge
Я использую PySpark с помощью команды spark-submit
в режиме YARN client
Что я пробовал
Я пытался добавить persist
и cache
перед вызовом фильтра, но это не помогло
РЕДАКТИРОВАТЬ - некоторые подробности
I понимать, что ошибка появляется, когда я вызываю свою функцию записи после многократного преобразования, т.е. первого действия. Если у меня в преобразовании dropDuplicates
перед записью, произойдет сбой с ошибкой.
Если у меня не , в моем преобразовании dropDuplicates
, запись работает нормально.