Как извлечь yarn_application_id
из SparkSubmitHook
? Я пытался использовать пользовательский оператор и свойство task_instance
, но, думаю, что-то пропустил ...
def task_failure_callback(context):
task_instance = context.get('task_instance') # Need to access yarn_application_id here
operator = task_instance.operator
application_id = operator.yarn_application_id
return ...
default_args = {
'start_date': ...,
'on_failure_callback': task_failure_callback
}
with DAG(DAG_ID, default_args=default_args, catchup=CATCHUP, schedule_interval=SCHEDULE_INTERVAL) as dag:
...
Поэтому я попытался добавить его в качестве нового значения ключа в диктовке context
, нобез успеха ...
class CustomSparkSubmitHook(SparkSubmitHook, LoggingMixin):
def __init__(self, ...):
super().__init__(...)
def submit_with_context(self, context, application="", **kwargs):
# Build spark submit cmd
...
# Run cmd as subprocess
...
# Process spark submit log
...
# Check spark-submit return code. In Kubernetes mode, also check the value
# of exit code in the log, as it may differ.
...
# We want the Airflow job to wait until the Spark driver is finished
if self._should_track_driver_status:
if self._driver_id is None:
raise AirflowException(
"No driver id is known: something went wrong when executing " +
"the spark submit command"
)
# We start with the SUBMITTED status as initial status
self._driver_status = "SUBMITTED"
# Trying to export yarn_application_id unsuccessfully
context['yarn_application_id'] = self.yarn_application_id
# Start tracking the driver status (blocking function)
...
@property
def yarn_application_id(self):
return self._yarn_application_id