AWS Glue не удается написать паркет, не хватает памяти - PullRequest
0 голосов
/ 29 января 2019

Я думаю, что AWS Glue не хватает памяти после сбоя записи выходных данных паркета ...

Произошла ошибка при вызове o126.parquet.Задание прервано из-за сбоя этапа: задача 82 на этапе 9.0 завершилась неудачно 4 раза, последний сбой: потерянная задача 82.3 на этапе 9.0 (TID 17400, ip-172-31-8-70.ap-southeast-1.compute.internal,executor 1): ExecutorLostFailure (executor 1 завершился из-за одной из запущенных задач) Причина: Контейнер уничтожен YARN за превышение пределов памяти.5,5 ГБ из 5,5 ГБ физической памяти.Рассмотрите возможность повышения spark.yarn.executor.memoryOverhead.

Более полный журнал ниже

Трассировка (последний последний вызов): Файл "script_2019-01-29-06-53-53.py ", строка 71, в файле .parquet (" s3: //.../flights2 ")" /mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark.zip/pys/readwriter.py ", строка 691, в файле паркета" /mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gate.line.py.yway1010 * call Файл "/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", строка 63 / пользователь y / файл deco в файле "root / appcache / application_1548744646207_0001 / container_1548744646207_0001_01_000001 / py4j-0.10.4-src.zip / py4j / protocol.py ", строка 319, в get_return_value py4j.protocol.Py4JJavaError: во время вызова произошла ошибка,: org.apache.spark.SparkException: задание прервано.в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply $ mcV $ sp (FileFormatWriter.scala: 213) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun$ write $ 1.apply (FileFormatWriter.scala: 166) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply (FileFormatWriter.scala: 166) в org.apache.spark.sql.execute.SQLExecution $ .withNewExecutionId (SQLExecution.scala: 65) в org.apache.spark.sql.execution.datasources.FileFormatWriter $ .write (FileFormatWriter.scala: 166) в org.apache.spark.sql.exources.InsertIntoHadoopFsRelationCommand.run (InsertIntoHadoopFsRelationCommand.scala: 145) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute (commands.scala: 58) в org.apache.sparktedomExex.exec.sideEffectResult (commands.scala: 56) в org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute (commands.scala: 74) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 117) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 117) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 138) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) в org.apache.spark.sql.execu.SparkPlan.executeQuery (SparkPlan.scala: 135) в org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 116) в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (Query $.scala: 92) в org.apache.spark.sql.execution.QueryExecution.toRdd (QueryExecution.scala: 92) в org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat (DataSource.scala: 435) вorg.apache.spark.sql.execution.datasources.DataSource.write (DataSource.scala: 471) в org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run (SaveIntoDataSourceCommand.scala: 50) в org.ap.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute (commands.scala: 58) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult (commands.scala: 56) в org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute (command.scala: 74) вorg.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 117) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 117) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 138) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)в org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 135) в org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 116) в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (QueryExecution.scala: 92) в org.apache.spark.sql.execution.QueryExecution.toRdd (QueryExecution.scala: 92) в org.apache.spark.sql.DataFrameWriter.runCom ().scala: 609) в org.apache.spark.sql.DataFrameWriter.save (DataFrameWriter.scala: 233) в org.apache.spark.sql.DataFrameWriter.save (DataFrameWriter.scala: 217) в org.apache.spark.sql.DataFrameWriter.parquet (DataFrameWriter.scala: 508) в sun.reflect.NativeMethodAccessorImpl.invoke0 (родной метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpho.ava.j.v.fl.Java: 498) в py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) в py4j.Gateway.invoke (Gateway.java:280) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.commands.CallCommand.execute (CallCommand.java:79) в py4j.GatewayConnection.run (GatewayConnection.java:214) в java.lang.Thread.run Thread (.java: 748) Причина: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 82 на этапе 9.0 не выполнена 4 раза, последний сбой: потерянная задача 82.3 на этапе 9.0 (TID 17400, ip-172-31-8-70.ap-southeast-1.compute.internal, исполнитель 1): ExecutorLostFailure (выход executor 1 вызван одной из запущенных задач) Причина: уничтожен контейнер bY YARN за превышение пределов памяти.5,5 ГБ из 5,5 ГБ физической памяти.Попробуйте увеличить spark.yarn.executor.memoryOverhead.Отслеживание стека драйверов: в org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1517) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $.применить (DAGScheduler.scala: 1505) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1504) в scala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala:59) в scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48) в org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1504) в org.apache.spark.scheduler.DAGScheduler $$anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 814) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply (DAGScheduler.scala: 814) в scala.Optionscalaach 25 (опция) в org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala: 814) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala: 1732) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1687) в org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1676u) или org..EventLoop $$ anon $ 1.run (EventLoop.scala: 48) в org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 630) в org.apache.spark.SparkContext.runJob (SparkContext.scala: 2029) в org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply $ mcV $ sp (FileFormatWriter.scala: 186)

Кажется, что ошибочная строка:

.parquet("s3://pinfare-glue/flights2")

Моя работа с клеем выглядит следующим образом: как я могу решить эту проблему?Я рассматриваю возможность удаления некоторых папок из S3, чтобы Glue обрабатывал данные пакетами ... но это не масштабируется ...

Другое дело, может быть, я создаю кадр данных для каждой даты и записываю эти меньшие разделы вцикл ... но это будет очень медленно?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import regexp_replace, to_timestamp

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print(">>> READING ...")
inputGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="inputGDF")
# inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://pinfare-actuary-storage-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
print(">>> DONE READ ...")

flightsDf = inputGDF.toDF()
if bool(flightsDf.head(1)):
    df = flightsDf \
        .drop("createdat") \
        .drop("updatedat") \
        .withColumn("agent", flightsDf["agent"].cast("int")) \
        .withColumn("querydestinationplace", flightsDf["querydestinationplace"].cast("int")) \
        .withColumn("querydatetime", regexp_replace(flightsDf["querydatetime"], "-", "").cast("int")) \
        .withColumn("queryoutbounddate", regexp_replace(flightsDf["queryoutbounddate"], "-", "").cast("int")) \
        .withColumn("queryinbounddate", regexp_replace(flightsDf["queryinbounddate"], "-", "").cast("int")) \
        .withColumn("outdeparture", to_timestamp(flightsDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("outarrival", to_timestamp(flightsDf["outarrival"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("indeparture", to_timestamp(flightsDf["indeparture"], "yyyy-MM-ddTHH:mm:ss")) \
        .withColumn("inarrival", to_timestamp(flightsDf["inarrival"], "yyyy-MM-ddTHH:mm:ss")) \

    df.createOrReplaceTempView("flights")

    airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
    airportsDF = airportsGDF.toDF()
    airportsDF.createOrReplaceTempView("airports")

    agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
    agentsRawDF = agentsGDF.toDF()
    agentsRawDF.createOrReplaceTempView("agents_raw")

    agentsDF = spark.sql("""
        SELECT id, name, type FROM agents_raw
        WHERE type IN ('Airline', 'TravelAgent')
    """) 
    agentsDF.createOrReplaceTempView("agents")

    finalDf = spark.sql("""
            SELECT /*+ BROADCAST(agents) */ /*+ BROADCAST(airports) */
                f.*, countryName, cityName, airportName, a.name AS agentName,
                CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
            FROM flights f
            LEFT JOIN agents a
            ON f.agent = a.id
            LEFT JOIN airports p
            ON f.querydestinationplace = p.airportId
        """)
    print(">>> DONE PROCESS FLIGHTS")

    print("Writing ...")
    finalDf \
      .write \
      .mode("append") \
      .partitionBy(["countryName", "querydatetime"]) \
      .parquet("s3://.../flights2")
else:
    print("Nothing to write ...")

job.commit()

import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='...')

1 Ответ

0 голосов
/ 29 января 2019

Если ваше LEFT JOIN имеет отображение 1: N, это приведет к экспоненциально большим строкам в DF, что может привести к OOM.Что касается клея, то здесь нет возможности настроить свою собственную конфигурацию, например, 64 ГБ памяти на vCPU.Если это так, сначала попробуйте использовать опцию spark.yarn.executor.memoryOverhead или / и увеличить число DPU.В противном случае вам нужно создать данные с использованием предиката pushdown, а затем выполнить цикл for для всех данных

...