Как обрабатывать ошибки в сопоставленных функциях в AWS Glue? - PullRequest
0 голосов
/ 24 мая 2018

Я использую метод map DynamicFrame (или, что эквивалентно, метод Map.apply).Я заметил, что любые ошибки в функции, которые я передаю этим функциям, автоматически игнорируются и приводят к тому, что возвращаемый DynamicFrame становится пустым.

Скажем, у меня есть сценарий задания, подобный этому:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *

glueContext = GlueContext(SparkContext.getOrCreate())
dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")

def my_mapper(rec):
    import logging
    logging.error("[RADIX] An error-log from in the mapper!")
    print "[RADIX] from in the mapper!"
    raise Exception("[RADIX] A bug!")
dyF = dyF.map(my_mapper, 'my_mapper')

print "Count:  ", dyF.count()
dyF.printSchema()
dyF.toDF().show()

Если я запускаю этот скрипт в моей конечной точке Glue Dev с gluepython, я получаю вывод, подобный этому:

[glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Count:   0
root

++
||
++
++

Примечания по этому выводу:

  • Я несм. результат оператора print или оператора logging.error.
  • Нет никаких признаков того, что my_mapper вызвал исключение.
  • Вызов printSchema показывает, что нетметаданные схемы в созданном DynamicFrame
  • метод show также не производит никакого вывода, указывая, что все строки пропали.

Аналогично, когда я сохраняю этот скрипт какзадание в консоли AWS Glue, и запустите его, задание не указывает на возникшую ошибку - статус задания «Успешно».Примечательно, что я do получаю операторы print и logging.error вызовы, выводимые в журналы заданий, но только в обычных "Журналах", а не в "Журналах ошибок".

ЧтоЯ хочу, чтобы иметь возможность указать, что моя работа не удалось, и чтобы можно было легко найти эти журналы ошибок.Самое важное - просто указать, что произошел сбой.

Есть ли способ зарегистрировать ошибку в отображенной функции таким образом, чтобы Glue выбрал ее как «Журнал ошибок» (и поместил вчто отдельный путь AWS CloudWatch Logs)?Если это произойдет, будет ли он автоматически помечать всю работу как отказавшую?Или есть какой-то другой способ явного сбоя задания из отображенной функции?

(мой план, если есть способ зарегистрировать ошибки и / или пометить задание как неудачное, это создать декоратор илидругая служебная функция, которая будет автоматически перехватывать исключения в моих отображенных функциях и обеспечивать их регистрацию и пометку как сбой).

1 Ответ

0 голосов
/ 21 июля 2018

Единственный способ, которым я обнаружил, чтобы задание Glue отображалось как «Неудачный», - это вызвать исключение из основного скрипта (, а не внутри функции сопоставления или фильтра, так как те, кажется, вращаютсяк блокам обработки данных).

К счастью, - это способ определить, произошло ли исключение внутри карты или функции фильтра: используя метод DynamicFrame.stageErrorsCount().Он вернет число, указывающее, сколько исключений было сгенерировано при выполнении самого последнего преобразования.

Таким образом, правильный способ решения всех проблем:

  • убедитесь, что ваша карта или функция преобразования явно регистрирует любые исключения, которые происходят внутри него.Лучше всего это сделать, используя функцию декоратора или какой-то другой механизм многократного использования, вместо того, чтобы полагаться на операторы try/except в каждой написанной вами функции.
  • после каждого преобразования, в которое вы хотите перехватывать ошибки,вызовите метод stageErrorsCount() и убедитесь, что он больше 0. Если вы хотите прервать задание, просто вызовите исключение.

Например:

import logging

def log_errors(inner):
    def wrapper(*args, **kwargs):
        try:
            inner(*args, **kwargs)
        except Exception as e:
            logging.exception('Error in function: {}'.format(inner))
            raise
    return wrapper

@log_errors
def foo(record):
    1 / 0

Затем,внутри вашей работы вы бы сделали что-то вроде:

df = df.map(foo, "foo")
if df.stageErrorsCount() > 0:
    raise Exception("Error in job! See the log!")

Обратите внимание, что даже вызов logging.exception изнутри функции mapper по-прежнему не записывает журналы в журнал error Журналы AWS CloudWatch, по некоторым причинам.Это записывается в обычные журналы успеха.Однако с помощью этой техники вы по крайней мере увидите, что задание не выполнено, и сможете найти информацию в журналах.Еще одно предостережение: конечные точки разработчиков, похоже, не показывают ЛЮБЫЕ логи из функций картографирования или фильтрации.

...