ErrorMessage': 'An error occurred while calling o103.pyWriteDynamicFrame.
Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 5.0
(TID 131, ip-1-2-3-4.eu-central-1.compute.internal, executor 20):
ExecutorLostFailure (executor 20 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of
5.5 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Работа выполняется следующим образом (псевдокод):
- Читает CSV в DyanamicFrame
dynf
- `dynf.toDF (). Repartition (100)
Map.apply(dyndf, tf) # tf being function applied on every row
- `dynf.toDF (). Coalesce (10)
- Записывает
dyndf
как паркет в S3
Эта работабыл выполнен десятки раз с идентичной настройкой Glue (Стандартный рабочий с MaxCapacity 10.0) успешно, и повторное выполнение на CSV, на котором он завершился неудачей, обычно успешно без каких-либо настроек. Значение: это работает. Не только это. Работа выполнялась даже успешно с гораздо большими CSV, чем те, на которых она не работала.
Вот что я имею в виду с ошибкой. Я не вижу такой картины, как если CSV больше X, тогда мне нужно больше работников или что-то в этом роде.
Кто-нибудь знает, что может быть причиной этой ошибки, которая происходит несколько случайно?
Соответствующая часть кода:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
# s3://bucket/path/object
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'SOURCE_BUCKET', # "bucket"
'SOURCE_PATH', # "path/"
'OBJECT_NAME', # "object"
'TARGET_BUCKET', # "bucket"
'TARGET_PATH', # "path/"
'PARTS_LOAD',
'PARTS_SAVE'
])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
data_DYN = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={
"paths":[
"s3://{sb}/{sp}{on}".format(
sb=args['SOURCE_BUCKET'],
sp=args['SOURCE_PATH'],
on=args['OBJECT_NAME']
)
]
},
format_options={
"withHeader": True,
"separator": ","
}
)
data_DF = data_DYN.toDF().repartition(int(args["PARTS_LOAD"]))
data_DYN = DynamicFrame.fromDF(data_DF, glueContext, "data_DYN")
def tf(rec):
# functions applied to elements of rec
return rec
data_DYN_2 = Map.apply(data_DYN, tf)
cols = [
'col1', 'col2', ...
]
data_DYN_3 = SelectFields.apply(data_DYN_2, cols)
data_DF_3 = data_DYN_3.toDF().cache()
data_DF_4 = data_DF_3.coalesce(int(args["PARTS_SAVE"]))
data_DYN_4 = DynamicFrame.fromDF(data_DF_4, glueContext, "data_DYN_4")
datasink = glueContext.write_dynamic_frame.from_options(
frame = data_DYN_4,
connection_type = "s3",
connection_options = {
"path": "s3://{tb}/{tp}".format(tb=args['TARGET_BUCKET'],tp=args['TARGET_PATH']),
"partitionKeys": ["col_x","col_y"]
},
format = "parquet",
transformation_ctx = "datasink"
)
job.commit()