Клей AWS: команда не выполнена с кодом выхода 1 - PullRequest
0 голосов
/ 02 февраля 2019

Моя работа по склеиванию завершается с ошибкой «Команда не выполнена с кодом выхода 1».Когда я пытался заглянуть в журналы ошибок, я не нашел ничего полезного ...

Когда я пытался заглянуть в журналы, я кое-что увидел, но не уверен, является ли это причиной ошибки ...

19/02/02 02:25:41 INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
19/02/02 02:25:41 INFO PythonRunner: Times: total = 50, boot = -794, init = 844, finish = 0
19/02/02 02:25:41 INFO PythonRunner: Times: total = 66, boot = -492, init = 558, finish = 0

Видите кучу таких вещей ... что это значит, это не кажется причиной?Так как, кажется, можно продвинуться дальше?

19/02/02 02:26:46 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/02/02 02:26:46 INFO DiskBlockManager: Shutdown hook called
19/02/02 02:26:46 INFO ShutdownHookManager: Shutdown hook called
19/02/02 02:26:46 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/root/appcache/application_1549073038750_0001/spark-63e4a4fb-d13b-488e-849e-8b9e301d8f12
End of LogType:stderr

Что означает ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM?Возможно, именно поэтому мои исполнители перестают давать метрики?

enter image description here

Хочу отметить, что я начинаю с большего количества исполнителей, которые, кажется, через некоторое время останавливаются ...

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
19/02/02 02:22:41 INFO GlueCloudwatchSink: CloudwatchSink: Obtained credentials from the Instance Profile
19/02/02 02:22:41 INFO GlueCloudwatchSink: CloudwatchSink: jobName: pinfare-testing jobRunId: jr_a7eb64f80be6184c1c069fdd6914abcb386e0796eed406f230a3cee11341c722
19/02/02 02:22:41 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@172.31.36.250:46089
19/02/02 02:22:41 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
19/02/02 02:22:41 INFO Executor: Starting executor ID 14 on host ip-172-31-32-233.ap-southeast-1.compute.internal

Как насчетэто о регистрации?

Я также вижу такие вещи: " Не все байты были прочитаны из S3ObjectInputStream, прерывая HTTP-соединение. Это, скорее всего, ошибка и может привести к неоптимальному поведению. "

19/02/02 02:20:35 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
19/02/02 02:20:35 INFO Executor: Finished task 163.0 in stage 8.0 (TID 314). 2853 bytes result sent to driver
19/02/02 02:20:35 INFO FileScanRDD: Reading File path: s3://pinfare-glue/flights/querydestinationplace=9968/querydatetime=2019-01-28/part-00247-fb439e5e-62ba-4afb-8ef9-85aa4d46002f.c000.snappy.parquet, range: 0-22029, partition values: [9968,17924]
19/02/02 02:20:35 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/flights/querydestinationplace=9968/querydatetime=2019-01-28/part-00247-fb439e5e-62ba-4afb-8ef9-85aa4d46002f.c000.snappy.parquet' for reading
19/02/02 02:20:35 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
19/02/02 02:20:35 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/flights/querydestinationplace=9968/querydatetime=2019-01-28/part-00247-fb439e5e-62ba-4afb-8ef9-85aa4d46002f.c000.snappy.parquet' for reading
19/02/02 02:20:35 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.

Мое задание по склеиванию (обновлено: похоже, это все, что требуется для сбоя Glue. Похоже, что Glue не может читать из каталога данных без нехватки памяти? Предложения по устранению проблем OOM предназначены только для чтения файлов S3https://docs.aws.amazon.com/glue/latest/dg/monitor-profile-debug-oom-abnormalities.html):

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import UserDefinedFunction, regexp_replace, to_timestamp
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, IntegerType, Row

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
airportsDF = airportsGDF.toDF().select("airportId", "countryName", "cityName", "airportName")
airportsDF = airportsDF.withColumn("airportId", airportsDF["airportId"].cast("string"))
airportsDF.createOrReplaceTempView("airports")

agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
agentsRawDF = agentsGDF.toDF().select("id", "name")
agentsRawDF = agentsRawDF.withColumn("id", agentsRawDF["id"].cast("string"))
agentsRawDF.createOrReplaceTempView("agents")
# agentsRawDF.createOrReplaceTempView("agents_raw")

# agentsDF = spark.sql("""
#     SELECT * FROM agents_raw
#     WHERE type IN ('Airline', 'TravelAgent')
# """) 
# agentsDF.createOrReplaceTempView("agents")

def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]

def generate_date_series(start, stop):
    dates = []
    for x in range(0, (stop-start).days + 1):
        date = start + timedelta(days=x)
        dateStr = date.strftime("%Y%m%d")
        print('str', dateStr)
        dateNum = int(dateStr)
        print('num', dateStr)
        dates.append(dateNum)
    return dates
    # return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

spark.udf.register("generate_date_series", generate_date_series, ArrayType(IntegerType()))

# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime.utcnow().date()
start = today - timedelta(days = 14) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")

for items in batch(arr, 1):
    batch = ','.join(map(lambda s: str(s), items))

    print("===LOG:START_BATCH_%s===" % (batch))
    flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
        querydatetime BETWEEN '%s' AND '%s' 
        AND querydestinationplace IN (%s)
    """ % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), batch))

    flightsDf = flightsGDF.toDF()
    flightsDf.createOrReplaceTempView("flights")

    # Attempting to reduce memory usage ... but I dont think this helps at all

    spark.catalog.dropTempView("flights")
    spark.catalog.dropTempView("expandedKeyDates")
    spark.catalog.dropTempView("distinctKeys")

    flightsGDF = None
    flightsDf = None
    resultDf = None
    distinctKeysDf = None
    expandedKeyDatesDf = None
    cleanedFlightsDf = None


job.commit()

ОБНОВЛЕНИЕ

Я попытался объяснить запрос, который не прошел после некоторой итерации, надеюсь, это поможет:

'Project [ArrayBuffer(f).*, 'countryName, 'cityName, 'airportName, 'a.name AS agentName#248, 'CONCAT('f.outboundlegid, -, 'f.inboundlegid, -, 'f.agent) AS key#249]
+- 'Join LeftOuter, ('f.querydestinationplace = 'p.airportId)
:- 'Join LeftOuter, ('f.agent = 'a.id)
: :- 'SubqueryAlias f
: : +- 'UnresolvedRelation `flights`
: +- 'SubqueryAlias a
: +- 'UnresolvedRelation `agents`
+- 'SubqueryAlias p
+- 'UnresolvedRelation `airports`

== Analyzed Logical Plan ==
Id: string, QueryTaskId: string, QueryOriginPlace: string, QueryOutboundDate: string, QueryInboundDate: string, QueryCabinClass: string, QueryCurrency: string, Agent: string, QuoteAgeInMinutes: string, Price: string, OutboundLegId: string, InboundLegId: string, OutDeparture: string, OutArrival: string, OutDuration: string, OutJourneyMode: string, OutStops: string, OutCarriers: string, OutOperatingCarriers: string, NumberOutStops: string, NumberOutCarriers: string, NumberOutOperatingCarriers: string, InDeparture: string, InArrival: string, ... 15 more fields
Project [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 15 more fields]
+- Join LeftOuter, (querydestinationplace#210 = cast(airportId#33 as int))
:- Join LeftOuter, (agent#185 = id#84)
: :- SubqueryAlias f
: : +- SubqueryAlias flights
: : +- LogicalRDD [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 10 more fields]
: +- SubqueryAlias a
: +- SubqueryAlias agents
: +- Project [cast(id#65L as string) AS id#84, name#66, imageurl#67, status#68, optimisedformobile#69, type#70, bookingnumber#71, createdat#72, updatedat#73]
: +- LogicalRDD [id#65L, name#66, imageurl#67, status#68, optimisedformobile#69, type#70, bookingnumber#71, createdat#72, updatedat#73]
+- SubqueryAlias p
+- SubqueryAlias airports
+- Project [cast(airportId#18L as string) AS airportId#33, cityId#19L, countryId#20L, airportCode#21, airportName#22, cityName#23, countryName#24]
+- LogicalRDD [airportId#18L, cityId#19L, countryId#20L, airportCode#21, airportName#22, cityName#23, countryName#24]

== Optimized Logical Plan ==
Project [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 15 more fields]
+- Join LeftOuter, (querydestinationplace#210 = cast(airportId#33 as int))
:- Project [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 11 more fields]
: +- Join LeftOuter, (agent#185 = id#84)
: :- LogicalRDD [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 10 more fields]
: +- Project [cast(id#65L as string) AS id#84, name#66]
: +- LogicalRDD [id#65L, name#66, imageurl#67, status#68, optimisedformobile#69, type#70, bookingnumber#71, createdat#72, updatedat#73]
+- Project [cast(airportId#18L as string) AS airportId#33, airportName#22, cityName#23, countryName#24]
+- LogicalRDD [airportId#18L, cityId#19L, countryId#20L, airportCode#21, airportName#22, cityName#23, countryName#24]

== Physical Plan ==
*Project [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 15 more fields]
+- SortMergeJoin [querydestinationplace#210], [cast(airportId#33 as int)], LeftOuter
:- *Sort [querydestinationplace#210 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(querydestinationplace#210, 200)
: +- *Project [Id#178, QueryTaskId#179, QueryOriginPlace#180, QueryOutboundDate#181, QueryInboundDate#182, QueryCabinClass#183, QueryCurrency#184, Agent#185, QuoteAgeInMinutes#186, Price#187, OutboundLegId#188, InboundLegId#189, OutDeparture#190, OutArrival#191, OutDuration#192, OutJourneyMode#193, OutStops#194, OutCarriers#195, OutOperatingCarriers#196, NumberOutStops#197, NumberOutCarriers#198, NumberOutOperatingCarriers#199, InDeparture#200, InArrival#201, ... 11 more fields]
: +- SortMergeJoin [agent#185], [id#84], LeftOuter
: :- *Sort [agent#185 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(agent#185, 200)
: : +- Scan ExistingRDD[Id#178,QueryTaskId#179,QueryOriginPlace#180,QueryOutboundDate#181,QueryInboundDate#182,QueryCabinClass#183,QueryCurrency#184,Agent#185,QuoteAgeInMinutes#186,Price#187,OutboundLegId#188,InboundLegId#189,OutDeparture#190,OutArrival#191,OutDuration#192,OutJourneyMode#193,OutStops#194,OutCarriers#195,OutOperatingCarriers#196,NumberOutStops#197,NumberOutCarriers#198,NumberOutOperatingCarriers#199,InDeparture#200,InArrival#201,... 10 more fields]
: +- *Sort [id#84 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#84, 200)
: +- *Project [cast(id#65L as string) AS id#84, name#66]
: +- Scan ExistingRDD[id#65L,name#66,imageurl#67,status#68,optimisedformobile#69,type#70,bookingnumber#71,createdat#72,updatedat#73]
+- *Sort [cast(airportId#33 as int) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(airportId#33 as int), 200)
+- *Project [cast(airportId#18L as string) AS airportId#33, airportName#22, cityName#23, countryName#24]
+- Scan ExistingRDD[airportId#18L,cityId#19L,countryId#20L,airportCode#21,airportName#22,cityName#23,countryName#24]
...