Сбой задачи Cloud Composer Airflow, но функции завершены успешно - PullRequest
0 голосов
/ 28 февраля 2019

Я пытаюсь написать свое первое задание Airflow с помощью Cloud Composer.Моя группа обеспечения доступности баз данных имеет три задачи, первая успешно завершена, но вторая задача, по-видимому, завершается с ошибкой при отправке любого сообщения об ошибке.Я использую PythonOperator во втором задании.Вызываемая функция выполняет длительный запрос и опрашивает, пока запрос не будет завершен.Как только запрос завершен, я получаю сообщение о том, что данные были выведены в правильную таблицу, но затем Airflow рассматривает задачу как сбойную и повторяет попытку.

Мой default_args для DAG выглядит следующим образом:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': today.strftime("%Y-%m-%d"),
    'email': ['email@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dagrun_timeout': timedelta(minutes=30)
}

РЕДАКТИРОВАТЬ:

Вот мой вызываемый Python и PythonOperator.вызываемый run_query выводит данные в журналы Stackdriver и указывает, что фактическая функция завершается, но задача не выполняется.

def run_query(**kwargs):
    ti = kwargs['ti']
    creds = ti.xcom_pull(key='key value 1', task_ids=t1_id)
    service = adh.get_service(creds)

    return adh.start_saved_query(service,
                                 kwargs['customer_id'],
                                 kwargs['query_name'],
                                 kwargs['start_date'],
                                 kwargs['end_date'],
                                 kwargs['project'],
                                 kwargs['dataset'],
                                 kwargs['table'],
                                 parameters=kwargs['parameters'])
run_adh_query = PythonOperator(
    task_id="task2",
    provide_context=True,
    python_callable=run_query,
    dag=dag,
    trigger_rule='all_success',
    op_kwargs={
        'customer_id': 01234,
        'query_name': 'queryName',
        'start_date': start_date.strftime("%Y-%m-%d"),
        'end_date': end_date.strftime("%Y-%m-%d"),
        'project': adh_project,
        'dataset': adh_dataset,
        'table': adh_table,
        'parameters': {
        'CONV_START_DATE': {'value': conv_start_date.strftime("%Y-%m-%d")},
        'CONV_END_DATE': {'value': end_date.strftime("%Y-%m-%d")},
        'LOOKBACK_DAYS': {'value': str(lookback_days)}
        }
    }
)

Я был бы очень признателен за любые советы!

1 Ответ

0 голосов
/ 01 марта 2019

Я не вижу обработки ошибок в вашем коде.

Если длительный запрос и опросы не пройдены, вызовите AirflowException, это приведет к немедленному переходу задачи в состояние сбоя.

from airflow import AirflowException

ValueError можно использовать для сбоя и повторите попытку

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...