Я использую PySpark на AWS Glue.Когда я смотрю на показатели моей работы, они обычно выглядят следующим образом:
Обратите внимание, что количество исполнителей через некоторое время падает, почему так?Кроме того, тасования колючие, это потому, что обработка данных - после получения данных, занимает много времени?Я подозреваю, что что-то в моей работе не является оптимальным.Где полагаться на 1 машину, чтобы сделать большую работу.Но какая это часть?А предложения есть?
Мой код:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import UserDefinedFunction, udf, regexp_replace, to_date, to_timestamp, date_format, lit, date_add, datediff, lag, lead, col, when
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, StringType, DateType, Row
import math
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.window import Window
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# READ IN TABLES
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "airports")
airportsDF = airportsGDF.toDF().select("airportId", "countryName", "cityName", "airportName", "airportCode")
airportsDF = airportsDF.withColumn("airportId", airportsDF["airportId"].cast("string"))
airportsDF.createOrReplaceTempView("airports")
holidaysGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "holidays_clean")
holidaysDF = holidaysGDF.toDF()
holidaysDF.createOrReplaceTempView("holidays")
agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "agents")
agentsRawDF = agentsGDF.toDF().select("id", "name")
agentsRawDF = agentsRawDF.withColumn("id", agentsRawDF["id"].cast("string"))
agentsRawDF.createOrReplaceTempView("agents")
def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
# Query destination places for whitelisted countries
# arr = [13301,12929,14511,9968,15280,10193,13531,13439,16122,9498,16162,17210,12728,14534,12542,13303,16716,13311,12913,11036,17471,16240,10902,15526,17294,15671,10858,17482,12071,12337,17521,12274,10032,17396,11052,9970,12917,12195,10658,17409,13078,17416,17388,12118,10438,13113,11170,14213,9762,10871,11780,12392,15518,13536,10724,14260,16747,18490,17402,10284,10982,10431,16743,12482,10497,15168,16587,15412,17106,11017,17368,13804,15461,19461,16923,9794,12795,25396,12952,15422,10101,14147,10485,12210,25336,9449,15395,13947,11893,11109,9921,9799,15253,16945,13164,10031,17002,17152,16516,13180,16451,16437,11336,13428,10182,25405,16955,10180,12191]
arr = [12191, 15280, 16587, 10431, 9968, 12929]
# UDF to generate date range
def generate_date_series(start, stop):
return [start + timedelta(days=x) for x in range(0, (stop-start).days + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))
# UDF to generate a "(n,m]" range
def getInterval(num, start, stop, incr):
if (num is None):
return ""
lower = math.floor((num - 1) / incr) * incr
upper = lower + incr
return "(%d,%d]" % (lower, upper)
spark.udf.register("getInterval", getInterval, StringType())
getIntervalUdf = udf(getInterval)
# CREATE DF FOR PAST 90 DAYS EXCLUDING PAST 7 DAYS
today = datetime(2019, 1, 30) #datetime.utcnow().date()
start = today - timedelta(days = 14) # TODO: CHANGE TO 90
sevenDaysAgo = today - timedelta(days = 7)
print(">>> Generate data frame for ", start, " to ", sevenDaysAgo, "... ")
relaventDatesDf = spark.createDataFrame([
Row(start=start, stop=sevenDaysAgo)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")
relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
querydatetime BETWEEN '%s' AND '%s'
AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))
flightsDf = flightsGDF.toDF()
flightsDf = flightsDf \
.withColumn("out_date", to_date(flightsDf["outdeparture"])) \
.withColumn("in_date", to_date(flightsDf["indeparture"]))
flightsDf.createOrReplaceTempView("flights")
print("===LOG:STARTING_QUERY===")
resultDf = spark.sql("""
WITH f (
SELECT
/*+ BROADCAST(h) */
/*+ COALESCE(24) */
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key,
f.querydatetime,
f.outboundlegid,
f.inboundlegid,
f.agent,
f.queryoutbounddate,
f.queryinbounddate,
f.price,
f.outdeparture,
f.outarrival,
f.indeparture,
f.inarrival,
f.querydestinationplace,
f.numberoutstops,
CASE WHEN type = 'HOLIDAY' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (out_date BETWEEN start AND end)
THEN true
ELSE false
END out_is_longweekends,
CASE WHEN type = 'HOLIDAY' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_holiday,
CASE WHEN type = 'LONG_WEEKENDS' AND (in_date BETWEEN start AND end)
THEN true
ELSE false
END in_is_longweekends
FROM flights f
CROSS JOIN holidays h
)
SELECT
/*+ BROADCAST(a) */
/*+ BROADCAST(p) */
key,
querydatetime,
first(outboundlegid) as outboundlegid,
first(inboundlegid) as inboundlegid,
first(agent) as agent,
first(p.countryName) as countryName,
first(p.airportName) as airportName,
first(p.airportCode) as airportCode,
first(a.name) as agentName,
first(queryoutbounddate) as queryoutbounddate,
first(queryinbounddate) as queryinbounddate,
first(price) as price,
first(outdeparture) as outdeparture,
first(outarrival) as outarrival,
first(indeparture) as indeparture,
first(inarrival) as inarrival,
first(querydestinationplace) as querydestinationplace,
first(numberoutstops) as numberoutstops,
CASE WHEN array_contains(collect_set(out_is_holiday), true)
THEN 1
ELSE 0
END out_is_holiday,
CASE WHEN array_contains(collect_set(out_is_longweekends), true)
THEN 1
ELSE 0
END out_is_longweekends,
CASE WHEN array_contains(collect_set(in_is_holiday), true)
THEN 1
ELSE 0
END in_is_holiday,
CASE WHEN array_contains(collect_set(in_is_longweekends), true)
THEN 1
ELSE 0
END in_is_longweekends
FROM f
INNER JOIN agents a
ON f.agent = a.id
INNER JOIN airports p
ON f.querydestinationplace = p.airportId
GROUP BY
querydatetime,
key
""")
# resultDf.explain(True)
print("===LOG:ADDING_COLUMNS===")
resultDf = resultDf \
.withColumn("querydatetime", resultDf["querydatetime"].cast("date")) \
.withColumn("queryoutbounddate", resultDf["queryoutbounddate"].cast("date")) \
.withColumn("queryinbounddate", resultDf["queryinbounddate"].cast("date")) \
.withColumn("outdeparture", to_timestamp(resultDf["outdeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
.withColumn("outarrival", to_timestamp(resultDf["outarrival"], "yyyy-MM-dd'T'HH:mm:ss")) \
.withColumn("indeparture", to_timestamp(resultDf["indeparture"], "yyyy-MM-dd'T'HH:mm:ss")) \
.withColumn("inarrival", to_timestamp(resultDf["inarrival"], "yyyy-MM-dd'T'HH:mm:ss"))
resultDf.createOrReplaceTempView("flights")
# GET DISTINCT DATASET
print("===LOG:GETTING_DISTINCT===")
distinctKeysDf = spark.sql("""
SELECT key
FROM flights
GROUP BY key
""")
distinctKeysDf.createOrReplaceTempView("distinctKeys")
# GET RELAVENT DATES DATASET
print("===LOG:EXPANDING_DATASET===")
expandedKeyDatesDf = spark.sql("""
SELECT
/*+ BROADCAST(relaventDates) */
key, querydatetime
FROM relaventDates
CROSS JOIN distinctKeys
""")
expandedKeyDatesDf.createOrReplaceTempView("expandedKeyDates")
print("===LOG:GENERATE_CLEANED_DATASET===")
cleanedFlightsDf = spark.sql("""
SELECT
e.key AS master_key,
e.querydatetime AS master_querydatetime,
f.*
FROM expandedKeyDates e
LEFT JOIN flights f
ON e.key = f.key
AND e.querydatetime = f.querydatetime
""")
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("created_day", date_format(cleanedFlightsDf["querydatetime"], "EEEE")) \
.withColumn("created_month", date_format(cleanedFlightsDf["querydatetime"], "yyyy-MM")) \
.withColumn("created_month_m", date_format(cleanedFlightsDf["querydatetime"], "M").cast("int")) \
.withColumn("created_week", date_format(cleanedFlightsDf["querydatetime"], "w").cast("int")) \
.withColumn("out_date", cleanedFlightsDf["outdeparture"].cast("date")) \
.withColumn("out_day", date_format(cleanedFlightsDf["outdeparture"], "EEEE")) \
.withColumn("out_month", date_format(cleanedFlightsDf["outdeparture"], "yyyy-MM")) \
.withColumn("out_month_m", date_format(cleanedFlightsDf["outdeparture"], "M").cast("int")) \
.withColumn("out_week", date_format(cleanedFlightsDf["outdeparture"], "w").cast("int")) \
.withColumn("out_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["outdeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
.withColumn("out_hour", date_format(cleanedFlightsDf["outdeparture"], "k").cast("int")) \
.withColumn("in_date", cleanedFlightsDf["indeparture"].cast("date")) \
.withColumn("in_day", date_format(cleanedFlightsDf["indeparture"], "EEEE")) \
.withColumn("in_month", date_format(cleanedFlightsDf["indeparture"], "yyyy-MM")) \
.withColumn("in_month_m", date_format(cleanedFlightsDf["indeparture"], "M").cast("int")) \
.withColumn("in_week", date_format(cleanedFlightsDf["indeparture"], "w").cast("int")) \
.withColumn("in_departure_interval", getIntervalUdf(date_format(cleanedFlightsDf["indeparture"], "H").cast("int"), lit(0), lit(24), lit(4))) \
.withColumn("in_hour", date_format(cleanedFlightsDf["indeparture"], "k").cast("int")) \
.withColumn("price", cleanedFlightsDf["price"].cast("decimal"))
cleanedFlightsDf = cleanedFlightsDf.withColumn("advance_days", datediff(cleanedFlightsDf["out_date"], date_add(cleanedFlightsDf['querydatetime'], 1)))
cleanedFlightsDf = cleanedFlightsDf.withColumn("advance_days_cat", getIntervalUdf(cleanedFlightsDf["advance_days"], lit(0), lit(14*12), lit(14)))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a1", lead("price").over(
Window.partitionBy("key").orderBy("querydatetime")
))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a2", lead("price_a1").over(
Window.partitionBy("key").orderBy("querydatetime")
))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a3", lead("price_a2").over(
Window.partitionBy("key").orderBy("querydatetime")
))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a4", lead("price_a3").over(
Window.partitionBy("key").orderBy("querydatetime")
))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a5", lead("price_a4").over(
Window.partitionBy("key").orderBy("querydatetime")
))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a6", lead("price_a5").over(
Window.partitionBy("key").orderBy("querydatetime")
))
cleanedFlightsDf = cleanedFlightsDf \
.withColumn("price_a7", lead("price_a6").over(
Window.partitionBy("key").orderBy("querydatetime")
))
# print("===LOG:COUNT_BEFORE_FILTER===")
# print(cleanedFlightsDf.count())
cleanedFlightsDf = cleanedFlightsDf.filter( \
col("price_a1").isNotNull() & \
col("price_a2").isNotNull() & \
col("price_a3").isNotNull() & \
col("price_a4").isNotNull() & \
col("price_a5").isNotNull() & \
col("price_a6").isNotNull() & \
col("price_a7").isNotNull()
)
priceChangesDf = cleanedFlightsDf \
.withColumn("price_chg_a1", col("price_a1") - col("price")) \
.withColumn("price_chg_a2", col("price_a2") - col("price")) \
.withColumn("price_chg_a3", col("price_a3") - col("price")) \
.withColumn("price_chg_a4", col("price_a4") - col("price")) \
.withColumn("price_chg_a5", col("price_a5") - col("price")) \
.withColumn("price_chg_a6", col("price_a6") - col("price")) \
.withColumn("price_chg_a7", col("price_a7") - col("price")) \
.withColumn("price_inc_a1", when(col("price_a1") > 0, 1).otherwise(0)) \
.withColumn("price_inc_a2", when(col("price_a2") > 0, 1).otherwise(0)) \
.withColumn("price_inc_a3", when(col("price_a3") > 0, 1).otherwise(0)) \
.withColumn("price_inc_a4", when(col("price_a4") > 0, 1).otherwise(0)) \
.withColumn("price_inc_a5", when(col("price_a5") > 0, 1).otherwise(0)) \
.withColumn("price_inc_a6", when(col("price_a6") > 0, 1).otherwise(0)) \
.withColumn("price_inc_a7", when(col("price_a7") > 0, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a1", when(col("price_a1") > 10, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a2", when(col("price_a2") > 10, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a3", when(col("price_a3") > 10, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a4", when(col("price_a4") > 10, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a5", when(col("price_a5") > 10, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a6", when(col("price_a6") > 10, 1).otherwise(0)) \
.withColumn("price_inc_gt10_a7", when(col("price_a7") > 10, 1).otherwise(0)) \
.withColumn("price_chg_a1", when(col("price_a1") > 0, col("price_a1")).otherwise(0)) \
.withColumn("price_chg_a2", when(col("price_a2") > 0, col("price_a2")).otherwise(0)) \
.withColumn("price_chg_a3", when(col("price_a3") > 0, col("price_a3")).otherwise(0)) \
.withColumn("price_chg_a4", when(col("price_a4") > 0, col("price_a4")).otherwise(0)) \
.withColumn("price_chg_a5", when(col("price_a5") > 0, col("price_a5")).otherwise(0)) \
.withColumn("price_chg_a6", when(col("price_a6") > 0, col("price_a6")).otherwise(0)) \
.withColumn("price_chg_a7", when(col("price_a7") > 0, col("price_a7")).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a1", when(col("price_chg_a1") > 10, col("price_chg_a1") - 10).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a2", when(col("price_chg_a2") > 10, col("price_chg_a2") - 10).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a3", when(col("price_chg_a3") > 10, col("price_chg_a3") - 10).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a4", when(col("price_chg_a4") > 10, col("price_chg_a4") - 10).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a5", when(col("price_chg_a5") > 10, col("price_chg_a5") - 10).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a6", when(col("price_chg_a6") > 10, col("price_chg_a6") - 10).otherwise(0)) \
.withColumn("price_chg_inc_gt10_a7", when(col("price_chg_a7") > 10, col("price_chg_a7") - 10).otherwise(0)) \
print("===LOG:PRICE_CHANGE_EXPLAIN===")
priceChangesDf.explain()
print("===LOG:WRITING_CLEANED===")
priceChangesDf \
.repartition("master_querydatetime", "countryName") \
.write \
.mode("overwrite") \
.partitionBy(["master_querydatetime", "countryName"]) \
.parquet("s3://xxxxxx-glue/cleanedFlights")
print("===LOG:DONE_WRITING_CLEANED===")
priceChangesDf.createOrReplaceTempView("flights")
datarobot1Df = spark.sql("""
SELECT
airportCode AS DestinationStation,
countryname AS DestinationCountryName,
agentName AS AgentName,
advance_days AS DaysInAdvance,
out_month_m AS OutboundDepartureMonth,
out_day AS OutboundDepartureDay,
out_week AS OutboundDepartureWeek,
out_hour AS OutboundDepartureHour,
out_is_longweekends AS OutboundDepartureLongWeekend,
out_is_holiday AS OutboundDepartureSchoolHoliday,
in_month_m AS InboundDepartureMonth,
in_day AS InboundDepartureDay,
in_week AS InboundDepartureWeek,
in_hour AS InboundDepartureHour,
in_is_longweekends AS InboundDepartureLongWeekend,
in_is_holiday AS InboundDepartureSchoolHoliday,
created_day AS QuoteDay,
numberoutstops AS OutboundStopsNum,
out_departure_interval AS OutboundDepartureInterval,
price_chg_inc_gt10_a7,
price AS Price
FROM flights
""")
datarobot1Df.coalesce(1) \
.write \
.mode("overwrite") \
.csv("s3://xxxxxx-glue/datarobot1/input", header = True)
job.commit()