У меня есть конвейер Airflow, который создает 12 промежуточных таблиц из файлов Google Cloud Storage, а затем выполняет некоторую последующую обработку.У меня есть DummyOperator для сбора этих задач перед переходом к следующим этапам.
Я получаю сообщение об ошибке в операторе wait_stg_load о том, что он находится в состоянии upstream_failed.Однако все вышеперечисленные задачи отмечены как успешные.Сама группа DAG теперь помечена как неудачная.Если я очищаю статус на wait_stg_load
, все идет хорошо.Любые идеи о том, что я делаю неправильно?
Я использую Google Cloud Composer, который является версией Airflow v 1.9 на Python 3
![enter image description here](https://i.stack.imgur.com/FO9qa.png)
with DAG('load_data',
default_args=default_args,
schedule_interval='0 9 * * *',
concurrency=3
) as dag:
t2 = DummyOperator(
task_id='wait_stg_load',
dag=dag
)
for t in tables:
t1 = GoogleCloudStorageToBigQueryOperator(
task_id='load_stg_{}'.format(t.replace('.','_')),
bucket='my-bucket',
source_objects=['data/{}.json'.format(t)],
destination_project_dataset_table='{}.stg_{}'.format(DATASET_NAME, t.replace('.','_')),
schema_object='data/schemas/{}.json'.format(t),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag
)
t1 >> t2
![Airflow Diagram](https://i.stack.imgur.com/8vjZW.png)
Обновление 1
Я считаю, что это проблема параллелизма в Airflow.Я заметил, что задание действительно в какой-то момент терпит неудачу, но потом все равно выполняется.Он помечается как завершенный, но DummyOperator этого не видит.
[2019-02-14 09:00:14,734] {cli.py:374} INFO - Running on host airflow-worker
[2019-02-14 09:00:16,686] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [queued]>
[2019-02-14 09:00:16,694] {models.py:1189} INFO - Dependencies not met for <TaskInstance: dag.task 2019-02-13 09:00:00 [queued]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (3) for this task's DAG 'dag' has been reached.
[2019-02-14 09:00:16,694] {models.py:1389} WARNING -
-------------------------------------------------------------------------------
FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 1. State set to NONE
-------------------------------------------------------------------------------
[2019-02-14 09:00:16,694] {models.py:1392} INFO - Queuing into pool None
[2019-02-14 09:00:26,619] {cli.py:374} INFO - Running on host airflow-worker
[2019-02-14 09:00:28,563] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [failed]>
[2019-02-14 09:00:28,570] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [failed]>
[2019-02-14 09:00:28,570] {models.py:1406} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of
-------------------------------------------------------------------------------
[2019-02-14 09:00:28,607] {models.py:1427} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): task> on 2019-02-13 09:00:00