У меня есть два сценария: а и б.В сценарии «а» два файла CSV считываются в два кадра данных и затем объединяются в результирующий кадр данных, который затем записывается в файл CSV.Эта задача не заканчивается проблемой OOM и выполняется очень быстро: 8-9 минут для 1 миллиарда строк, 100 столбцов, 41,2 ГБ CSV-файлов каждый.
Другой сценарий, 'b', похож на 'а 'во всех аспектах, кроме одного: формат письма.Входные файлы одинаковы: 1B строк, 100 столбцов, 41,2 ГБ CSV-файлов.Этот скрипт сохраняет результирующий фрейм данных в формате ORC.Это приводит к ошибке:
An error occurred while calling o91.orc. Job aborted due to stage failure: Task 36 in stage 4.0 failed 4 times, most recent failure: Lost task 36.3 in stage 4.0 (TID 800, ip-*-*-*-*.ap-south-1.compute.internal, executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Код для чтения csv в orc:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime
import time
# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print("All imports were successful.")
df = spark.read.csv(
's3://****',
header=True
)
print("First dataframe read with headers set to True")
df2 = spark.read.csv(
's3://****',
header=True
)
print("Second data frame read with headers set to True")
# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns
# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])
# Perform join
# df3 = df.alias('l').join(df2.alias('r'), on='l.left_c_0' == 'r.right_c_0')
# df3 = df.alias('l').join(df2.alias('r'), on='c_0')
df3 = df.join(
df2,
df["left_column_test_0"] == df2["right_column_test_0"]
)
print("Dataframes have been joined successfully.")
output_file_path = 's3://****
df3.write.orc(
output_file_path
)
# print("Dataframe has been written to csv.")
job.commit()
Мой CSV-файл выглядит следующим образом:
0,1,2,3,4,.....99
1,2,3,4,......100
2,3,4,5,......101
.
.
.
.
[continues until the 1 billionth row]
Как мне убедиться, что мой код не вызывает ошибку OOM?