PySpark G C превышен предел накладных расходов и тайм-аут - PullRequest
0 голосов
/ 12 июля 2020

Я написал код в pyspark, задача заключалась в том, чтобы найти Delta / измененные записи по сравнению с паркетным файлом предыдущего дня и записать его в csv с дополнительным столбцом, отмечающим новые записи или существующую запись. Это делается путем объединения столбцов и применения base64 (encode (columns here)), называющего это hash_key.

  1. Прочитать 9 паркетных файлов и зарегистрировать для них временные таблицы.

    • Примените к нему запрос, назвав результат как df2.
    • Прочтите предыдущий день паркетного файла из S3, назовите его как df1 и получите его количество.
    • Зарегистрируйте временные таблицы против обоих фреймов данных как t1 и t2.
  2. Теперь я использовал sqlContext. sql, чтобы найти измененные записи из t2 по сравнению с t1, используя hash_key, а именно df3, и получить количество строк.

  3. Сохранение разного в файле csv и замена файла паркета данными в df2.

До сих пор все работало отлично. Но позже задача состояла в том, чтобы показать, какие строки в дельта-записях являются полностью новыми, а какие старые с измененными некоторыми столбцами. Здесь я попытался использовать код PySpark и соединил два фрейма данных df1 и df3. Но у меня были следующие ошибки: G C превышен предел накладных расходов или тайм-аут

В качестве альтернативы я попытался зарегистрировать временные таблицы для фреймов данных и выполнил sql запрос поверх Это. но проблема остается прежней. Я действительно зациклился на этом. Не знаю, почему потребляются ресурсы.

Конфигурации:

conf = (SparkConf()
        .setAppName("GD_Regex")
        .set("spark.executor.instances", "3")
        .set("spark.executor.cores", "3")
        .set("spark.sql.parquet.enableVectorizedReader", "false")
        .set("spark.executor.memory", "4g")
        .set("fs.s3a.server-side-encryption-algorithm", "AES256")
        .set("spark.sql.autoBroadcastJoinThreshold","-1")
        )

Определение разницы с помощью hash_key

s3_parquet_file = sqlContext.read.parquet(parquet_path)
historical_file_row_count = s3_parquet_file.count()
s3_parquet_file.registerTempTable("s3_parquet_file_temp")
query_response.registerTempTable("query_response_temp")
filtered_data = sqlContext.sql(
        """select * from query_response_temp where hash_key NOT IN ( SELECT hash_key FROM s3_parquet_file_temp )""")

Подход 1:

ff = filtered_data.alias('df2').join(s3_parquet_file.alias('df1'), ( filtered_data.CTMS_STUDY_NUMBER == s3_parquet_file.CTMS_STUDY_NUMBER ) & ( filtered_data.CTMS_SITE_NUMBER == s3_parquet_file.CTMS_SITE_NUMBER ) 
        & (filtered_data.FULL_NAME_OF_PI == s3_parquet_file.FULL_NAME_OF_PI ) & ( filtered_data.PRIMARY_ROLE_NAME == s3_parquet_file.PRIMARY_ROLE_NAME) & ( filtered_data.CENTER_NAME == s3_parquet_file.CENTER_NAME ) , "outer" )\
        .select('df2.*', 'df1.CTMS_STUDY_NUMBER')\
        .withColumn('status', when(s3_parquet_file.CTMS_STUDY_NUMBER.isNotNull() & filtered_data.CTMS_STUDY_NUMBER.isNotNull(), 'existing').otherwise('new'))\
        .filter(filtered_data.CTMS_STUDY_NUMBER.isNotNull())\
        .select('df2.*', 'status')

Подход 2:

q = """SELECT query_response_temp.*, IF( query_response_temp.CTMS_STUDY_NUMBER IS NOT NULL AND s3_parquet_file_temp.CTMS_STUDY_NUMBER IS NOT NULL, "existing", "new" ) as status FROM query_response_temp FULL OUTER JOIN s3_parquet_file_temp on query_response_temp.CTMS_STUDY_NUMBER = s3_parquet_file_temp.CTMS_STUDY_NUMBER AND query_response_temp.CTMS_SITE_NUMBER = s3_parquet_file_temp.CTMS_SITE_NUMBER AND query_response_temp.FULL_NAME_OF_PI = s3_parquet_file_temp.FULL_NAME_OF_PI AND query_response_temp.PRIMARY_ROLE_NAME = s3_parquet_file_temp.PRIMARY_ROLE_NAME AND query_response_temp.CENTER_NAME = s3_parquet_file_temp.CENTER_NAME"""
        delta_records = sqlContext.sql(q)
        delta_records = delta_records.filter(delta_records.CTMS_STUDY_NUMBER.isNotNull())

Код, в котором возникает ошибка при достижении действия:

dataset = dataset.drop("hash_key")
print('hash_key dropped.')
dataset.coalesce(1).write.format('csv') \
  .option('header', 'true') \
  .option("compression", "none") \
  .mode('overwrite') \
  .save(_path)

1 Ответ

0 голосов
/ 12 июля 2020

Увеличьте executor memory и попробуйте повторно запустить задание.

...