AWS склеивает медленно, даже когда в запросе есть предложение LIMIT / WHERE / Sample - PullRequest
0 голосов
/ 28 января 2019

Мне интересно, почему мое задание Glue выполняется так медленно, даже если у запроса есть предложение LIMIT

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, DateType, Row
from pyspark.sql.functions import UserDefinedFunction, regexp_replace, to_timestamp


# INIT GLUE JOB
print(">>> INIT GLUE JOB ...")
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="flights")
flightsDF = flightsGDF.toDF()
flightsDF.createOrReplaceTempView("flights")

airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
airportsDF = airportsGDF.toDF()
airportsDF.createOrReplaceTempView("airports")

agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
agentsRawDF = agentsGDF.toDF()
agentsRawDF.createOrReplaceTempView("agents_raw")

agentsDF = spark.sql("""
    SELECT * FROM agents_raw
    WHERE type IN ('Airline', 'TravelAgent')
""") 
agentsDF.createOrReplaceTempView("agents")

# TRY TO PREPROCESS FLIGHTS
testDf = spark.sql("""
    SELECT 
        f.*, countryName, cityName, airportName, a.name AS agentName,
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
    FROM flights f
    LEFT JOIN agents a
    ON cast(f.agent as bigint) = a.id
    LEFT JOIN airports p
    ON cast(f.querydestinationplace as bigint) = p.airportId
    LIMIT 10
""")
df = testDf.withColumn("querydatetime", regexp_replace(testDf["querydatetime"], "-", "").cast("int"))
df = testDf.withColumn("queryoutbounddate", regexp_replace(testDf["queryoutbounddate"], "-", "").cast("int"))
df = testDf.withColumn("queryinbounddate", regexp_replace(testDf["queryinbounddate"], "-", "").cast("int"))
df = testDf.withColumn("outdeparture", to_timestamp(testDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("outarrival", to_timestamp(testDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("indeparture", to_timestamp(testDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("inarrival", to_timestamp(testDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))
df.show()

Обратите внимание, что в SQL есть LIMIT 10. Когда я выполняю тот же запрос в Афине, это занимает всего 6 секунд.... Я пытался закомментировать часть withColumn, но она все еще работает в течение ~ 30 + минут ... почему огромная разница?

Некоторые недавние выходные данные журнала:

19/01/28 07:45:55 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING) 19/01/28 07:45:55 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:56 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:56 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:57 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:57 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:58 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:58 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root

... если это помогает

Какие-либо общие рекомендации о том, как отлаживать Glue?Как будет очень полезно понять, что за клей в настоящее время обрабатывает какие части медленно

========

Я пытался отфильтровать данные

df = flightsDf \
    .where(flightsDf.querydatetime > "2019-01-22") \

Или выборка

df = flightsDf \
    .sample(False, 0.000001) \

Они помогают?Кажется, это займет много времени ...

...