Я построил лямбда-функцию для добавления заданий Spark как шагов в EMR каждый час.Однако Spark иногда дает сбой из-за временных ошибок.Если это произойдет, я хотел бы просто загрузить разрешение предыдущего часа с текущим часом, когда я добавлю следующий шаг.
Я решил эту проблему, но мне кажется, что этот код чреват слабостями и в целом, похоже, не является лучшим решением.Есть ли более надежный способ проверить, был ли последний запуск успешным, и если нет, получить диапазон дат, который он пытался загрузить?
def lambda_handler(event, context):
conn = boto3.client("emr")
clusters = conn.list_clusters()
clusters = [c["Id"] for c in clusters["Clusters"] if c["Name"] in ["Cluster Name"]]
cluster_id = clusters[0]
# Get list of steps on the cluster
steps = conn.list_steps(ClusterId = cluster_id)
# Check whether last run failed
most_recent_step = ''
last_run_successful = True
startdatetime = ''
for step in steps["Steps"]:
name = step['Name']
if (name[:10] == 'TypeOfStep'):
dt = step["Status"]["Timeline"]["StartDateTime"]
dtstr = dt.strftime('%Y-%m-%d %H:%M:%S')
if (dtstr > most_recent_step):
most_recent_step = dtstr
if (step['Status']['State'] in ['CANCEL_PENDING','CANCELLED','FAILED','INTERRUPTED']):
last_run_successful = False
startdatetime = name[13:]
# If last run was successful use the previous hour
if last_run_successful:
startdatetime = datetime.strftime(datetime.now() - timedelta(hours=6), '%Y-%m-%d %H:00:00')
enddatetime = datetime.strftime(datetime.now() - timedelta(hours=5), '%Y-%m-%d %H:00:00')
# Add step
step_args = ...
Это технически работает, но кажется, что естьВо многих случаях это может потерпеть неудачу (например, кто-то меняет одну букву имени шага, кто-то добавляет шаг с аналогичным именем, который не подходит для произвольного диапазона дат и приводит к сбою в будущем)
Спасибо!
РЕДАКТИРОВАТЬ: я должен отметить, что имена шагов имеют формат «TypeOfStep 2019-01-02 03:00:00», где дата-время в конце является датой начала и времени выше.Но я могу изменить это, если есть лучшее решение.