Я написал код в pyspark, задача заключалась в том, чтобы найти Delta / измененные записи по сравнению с паркетным файлом предыдущего дня и записать его в csv с дополнительным столбцом, отмечающим новые записи или существующую запись. Это делается путем объединения столбцов и применения base64 (encode (columns here)), называющего это hash_key.
Прочитать 9 паркетных файлов и зарегистрировать для них временные таблицы.
- Примените к нему запрос, назвав результат как df2.
- Прочтите предыдущий день паркетного файла из S3, назовите его как df1 и получите его количество.
- Зарегистрируйте временные таблицы против обоих фреймов данных как t1 и t2.
Теперь я использовал sqlContext. sql, чтобы найти измененные записи из t2 по сравнению с t1, используя hash_key, а именно df3, и получить количество строк.
Сохранение разного в файле csv и замена файла паркета данными в df2.
До сих пор все работало отлично. Но позже задача состояла в том, чтобы показать, какие строки в дельта-записях являются полностью новыми, а какие старые с измененными некоторыми столбцами. Здесь я попытался использовать код PySpark и соединил два фрейма данных df1 и df3. Но у меня были следующие ошибки: G C превышен предел накладных расходов или тайм-аут
В качестве альтернативы я попытался зарегистрировать временные таблицы для фреймов данных и выполнил sql запрос поверх Это. но проблема остается прежней. Я действительно зациклился на этом. Не знаю, почему потребляются ресурсы.
Конфигурации:
conf = (SparkConf()
.setAppName("GD_Regex")
.set("spark.executor.instances", "3")
.set("spark.executor.cores", "3")
.set("spark.sql.parquet.enableVectorizedReader", "false")
.set("spark.executor.memory", "4g")
.set("fs.s3a.server-side-encryption-algorithm", "AES256")
.set("spark.sql.autoBroadcastJoinThreshold","-1")
)
Определение разницы с помощью hash_key
s3_parquet_file = sqlContext.read.parquet(parquet_path)
historical_file_row_count = s3_parquet_file.count()
s3_parquet_file.registerTempTable("s3_parquet_file_temp")
query_response.registerTempTable("query_response_temp")
filtered_data = sqlContext.sql(
"""select * from query_response_temp where hash_key NOT IN ( SELECT hash_key FROM s3_parquet_file_temp )""")
Подход 1:
ff = filtered_data.alias('df2').join(s3_parquet_file.alias('df1'), ( filtered_data.CTMS_STUDY_NUMBER == s3_parquet_file.CTMS_STUDY_NUMBER ) & ( filtered_data.CTMS_SITE_NUMBER == s3_parquet_file.CTMS_SITE_NUMBER )
& (filtered_data.FULL_NAME_OF_PI == s3_parquet_file.FULL_NAME_OF_PI ) & ( filtered_data.PRIMARY_ROLE_NAME == s3_parquet_file.PRIMARY_ROLE_NAME) & ( filtered_data.CENTER_NAME == s3_parquet_file.CENTER_NAME ) , "outer" )\
.select('df2.*', 'df1.CTMS_STUDY_NUMBER')\
.withColumn('status', when(s3_parquet_file.CTMS_STUDY_NUMBER.isNotNull() & filtered_data.CTMS_STUDY_NUMBER.isNotNull(), 'existing').otherwise('new'))\
.filter(filtered_data.CTMS_STUDY_NUMBER.isNotNull())\
.select('df2.*', 'status')
Подход 2:
q = """SELECT query_response_temp.*, IF( query_response_temp.CTMS_STUDY_NUMBER IS NOT NULL AND s3_parquet_file_temp.CTMS_STUDY_NUMBER IS NOT NULL, "existing", "new" ) as status FROM query_response_temp FULL OUTER JOIN s3_parquet_file_temp on query_response_temp.CTMS_STUDY_NUMBER = s3_parquet_file_temp.CTMS_STUDY_NUMBER AND query_response_temp.CTMS_SITE_NUMBER = s3_parquet_file_temp.CTMS_SITE_NUMBER AND query_response_temp.FULL_NAME_OF_PI = s3_parquet_file_temp.FULL_NAME_OF_PI AND query_response_temp.PRIMARY_ROLE_NAME = s3_parquet_file_temp.PRIMARY_ROLE_NAME AND query_response_temp.CENTER_NAME = s3_parquet_file_temp.CENTER_NAME"""
delta_records = sqlContext.sql(q)
delta_records = delta_records.filter(delta_records.CTMS_STUDY_NUMBER.isNotNull())
Код, в котором возникает ошибка при достижении действия:
dataset = dataset.drop("hash_key")
print('hash_key dropped.')
dataset.coalesce(1).write.format('csv') \
.option('header', 'true') \
.option("compression", "none") \
.mode('overwrite') \
.save(_path)