Я пытаюсь написать свое первое задание Airflow с помощью Cloud Composer.Моя группа обеспечения доступности баз данных имеет три задачи, первая успешно завершена, но вторая задача, по-видимому, завершается с ошибкой при отправке любого сообщения об ошибке.Я использую PythonOperator
во втором задании.Вызываемая функция выполняет длительный запрос и опрашивает, пока запрос не будет завершен.Как только запрос завершен, я получаю сообщение о том, что данные были выведены в правильную таблицу, но затем Airflow рассматривает задачу как сбойную и повторяет попытку.
Мой default_args
для DAG выглядит следующим образом:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': today.strftime("%Y-%m-%d"),
'email': ['email@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dagrun_timeout': timedelta(minutes=30)
}
РЕДАКТИРОВАТЬ:
Вот мой вызываемый Python и PythonOperator.вызываемый run_query
выводит данные в журналы Stackdriver и указывает, что фактическая функция завершается, но задача не выполняется.
def run_query(**kwargs):
ti = kwargs['ti']
creds = ti.xcom_pull(key='key value 1', task_ids=t1_id)
service = adh.get_service(creds)
return adh.start_saved_query(service,
kwargs['customer_id'],
kwargs['query_name'],
kwargs['start_date'],
kwargs['end_date'],
kwargs['project'],
kwargs['dataset'],
kwargs['table'],
parameters=kwargs['parameters'])
run_adh_query = PythonOperator(
task_id="task2",
provide_context=True,
python_callable=run_query,
dag=dag,
trigger_rule='all_success',
op_kwargs={
'customer_id': 01234,
'query_name': 'queryName',
'start_date': start_date.strftime("%Y-%m-%d"),
'end_date': end_date.strftime("%Y-%m-%d"),
'project': adh_project,
'dataset': adh_dataset,
'table': adh_table,
'parameters': {
'CONV_START_DATE': {'value': conv_start_date.strftime("%Y-%m-%d")},
'CONV_END_DATE': {'value': end_date.strftime("%Y-%m-%d")},
'LOOKBACK_DAYS': {'value': str(lookback_days)}
}
}
)
Я был бы очень признателен за любые советы!