Airflow получает атрибут оператора из обратного вызова контекста - PullRequest
0 голосов
/ 05 ноября 2019

Как извлечь yarn_application_id из SparkSubmitHook? Я пытался использовать пользовательский оператор и свойство task_instance, но, думаю, что-то пропустил ...

def task_failure_callback(context):
    task_instance = context.get('task_instance')  # Need to access yarn_application_id here
    operator = task_instance.operator
    application_id = operator.yarn_application_id
    return ...

default_args = {
    'start_date': ...,
    'on_failure_callback': task_failure_callback
}

with DAG(DAG_ID, default_args=default_args, catchup=CATCHUP, schedule_interval=SCHEDULE_INTERVAL) as dag:
    ...

Поэтому я попытался добавить его в качестве нового значения ключа в диктовке context, нобез успеха ...

class CustomSparkSubmitHook(SparkSubmitHook, LoggingMixin):
    def __init__(self, ...):
        super().__init__(...)

    def submit_with_context(self, context, application="", **kwargs):
        # Build spark submit cmd
        ...
        # Run cmd as subprocess
        ...
        # Process spark submit log
        ...
        # Check spark-submit return code. In Kubernetes mode, also check the value
        # of exit code in the log, as it may differ.
        ...

        # We want the Airflow job to wait until the Spark driver is finished
        if self._should_track_driver_status:
            if self._driver_id is None:
                raise AirflowException(
                    "No driver id is known: something went wrong when executing " +
                    "the spark submit command"
                )

            # We start with the SUBMITTED status as initial status
            self._driver_status = "SUBMITTED"

            # Trying to export yarn_application_id unsuccessfully
            context['yarn_application_id'] = self.yarn_application_id

            # Start tracking the driver status (blocking function)
            ...

    @property
    def yarn_application_id(self):
        return self._yarn_application_id
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...