Неожиданное появление «Контейнер убит ЯРНО за превышение пределов памяти». - PullRequest
1 голос
/ 11 октября 2019
ErrorMessage': 'An error occurred while calling o103.pyWriteDynamicFrame. 
Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 5.0 
(TID 131, ip-1-2-3-4.eu-central-1.compute.internal, executor 20): 
ExecutorLostFailure (executor 20 exited caused by one of the running tasks) 
Reason: Container killed by YARN for exceeding memory limits.  5.5 GB of 
5.5 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead or disabling 
yarn.nodemanager.vmem-check-enabled because of YARN-4714.

Работа выполняется следующим образом (псевдокод):

  1. Читает CSV в DyanamicFrame dynf
  2. `dynf.toDF (). Repartition (100)
  3. Map.apply(dyndf, tf) # tf being function applied on every row
  4. `dynf.toDF (). Coalesce (10)
  5. Записывает dyndf как паркет в S3

Эта работабыл выполнен десятки раз с идентичной настройкой Glue (Стандартный рабочий с MaxCapacity 10.0) успешно, и повторное выполнение на CSV, на котором он завершился неудачей, обычно успешно без каких-либо настроек. Значение: это работает. Не только это. Работа выполнялась даже успешно с гораздо большими CSV, чем те, на которых она не работала.

Вот что я имею в виду с ошибкой. Я не вижу такой картины, как если CSV больше X, тогда мне нужно больше работников или что-то в этом роде.

Кто-нибудь знает, что может быть причиной этой ошибки, которая происходит несколько случайно?


Соответствующая часть кода:

import sys

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

# s3://bucket/path/object
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'SOURCE_BUCKET', # "bucket"
    'SOURCE_PATH',   # "path/"
    'OBJECT_NAME',   # "object"
    'TARGET_BUCKET', # "bucket"
    'TARGET_PATH',   # "path/"
    'PARTS_LOAD',
    'PARTS_SAVE'
])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_DYN = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={
        "paths":[
            "s3://{sb}/{sp}{on}".format(
                sb=args['SOURCE_BUCKET'],
                sp=args['SOURCE_PATH'],
                on=args['OBJECT_NAME']
            )
        ]
    },
    format_options={
        "withHeader": True,
        "separator": ","
    }
)

data_DF = data_DYN.toDF().repartition(int(args["PARTS_LOAD"]))
data_DYN = DynamicFrame.fromDF(data_DF, glueContext, "data_DYN")

def tf(rec):
    # functions applied to elements of rec
    return rec

data_DYN_2 = Map.apply(data_DYN, tf)

cols = [
    'col1', 'col2', ...
]

data_DYN_3 = SelectFields.apply(data_DYN_2, cols)

data_DF_3 = data_DYN_3.toDF().cache()

data_DF_4 = data_DF_3.coalesce(int(args["PARTS_SAVE"]))
data_DYN_4 = DynamicFrame.fromDF(data_DF_4, glueContext, "data_DYN_4")

datasink = glueContext.write_dynamic_frame.from_options(
    frame = data_DYN_4, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://{tb}/{tp}".format(tb=args['TARGET_BUCKET'],tp=args['TARGET_PATH']),
        "partitionKeys": ["col_x","col_y"]
    }, 
    format = "parquet",
    transformation_ctx = "datasink"
)

job.commit()

1 Ответ

1 голос
/ 11 октября 2019

Я подозреваю, что виновником является .coalesce(10) из-за сокращения количества разделов на 100 -> 10 без перебалансировки данных между ними. Выполнение .repartition(10) вместо этого может исправить это за счет дополнительной случайности.

...