DummyOperator помечен как upstream_failed, но все вышестоящие задачи отмечены как успешные - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть конвейер Airflow, который создает 12 промежуточных таблиц из файлов Google Cloud Storage, а затем выполняет некоторую последующую обработку.У меня есть DummyOperator для сбора этих задач перед переходом к следующим этапам.

Я получаю сообщение об ошибке в операторе wait_stg_load о том, что он находится в состоянии upstream_failed.Однако все вышеперечисленные задачи отмечены как успешные.Сама группа DAG теперь помечена как неудачная.Если я очищаю статус на wait_stg_load, все идет хорошо.Любые идеи о том, что я делаю неправильно?

Я использую Google Cloud Composer, который является версией Airflow v 1.9 на Python 3

enter image description here

with DAG('load_data',
    default_args=default_args,
    schedule_interval='0 9 * * *',
    concurrency=3
) as dag:

    t2 = DummyOperator(
        task_id='wait_stg_load',
        dag=dag
    )

    for t in tables:
        t1 = GoogleCloudStorageToBigQueryOperator(
            task_id='load_stg_{}'.format(t.replace('.','_')),
            bucket='my-bucket',
            source_objects=['data/{}.json'.format(t)],
            destination_project_dataset_table='{}.stg_{}'.format(DATASET_NAME, t.replace('.','_')),
            schema_object='data/schemas/{}.json'.format(t),
            source_format='NEWLINE_DELIMITED_JSON',
            write_disposition='WRITE_TRUNCATE',
            dag=dag
        )
        t1 >> t2

Airflow Diagram

Обновление 1

Я считаю, что это проблема параллелизма в Airflow.Я заметил, что задание действительно в какой-то момент терпит неудачу, но потом все равно выполняется.Он помечается как завершенный, но DummyOperator этого не видит.

[2019-02-14 09:00:14,734] {cli.py:374} INFO - Running on host airflow-worker
[2019-02-14 09:00:16,686] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [queued]>
[2019-02-14 09:00:16,694] {models.py:1189} INFO - Dependencies not met for <TaskInstance: dag.task 2019-02-13 09:00:00 [queued]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (3) for this task's DAG 'dag' has been reached.
[2019-02-14 09:00:16,694] {models.py:1389} WARNING -
-------------------------------------------------------------------------------
FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 1. State set to NONE
-------------------------------------------------------------------------------

[2019-02-14 09:00:16,694] {models.py:1392} INFO - Queuing into pool None
[2019-02-14 09:00:26,619] {cli.py:374} INFO - Running on host airflow-worker
[2019-02-14 09:00:28,563] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [failed]>
[2019-02-14 09:00:28,570] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dag.task 2019-02-13 09:00:00 [failed]>
[2019-02-14 09:00:28,570] {models.py:1406} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of 
-------------------------------------------------------------------------------

[2019-02-14 09:00:28,607] {models.py:1427} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): task> on 2019-02-13 09:00:00
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...