Задачи Airflow не работают с пулом 1 и параллелизмом 4 - PullRequest
1 голос
/ 07 мая 2020

Благодарю за помощь всех, кто может указать нам в правильном направлении.

У нас есть конфигурация пула воздушного потока, в которой мы планируем ограничить параллелизм при вызовах API в нашем mariaDB до 1, чтобы не перегружать БД. Наша главная цель - обеспечить параллелизм для задач, которые не требуют регулирования, чтобы повысить скорость нашего Daily ETL.

Для этого мы установили слоты пула для replica_db равными 1, как показано на изображении ниже enter image description here На нашей диаграмме DAG ниже мы намерены установить для первых двух задач использование слота replica_db

# #=====================================
# # JOB TASKS
# #=====================================

for TBL in LIST_OF_TABLES:
    prepare_sql                         =       bash_templateV2('prepare_sql',TBL,'replica_db')
    extract_lalafood_prod_replica       =       bash_templateV2('extract_lalafood_prod_replica',TBL,'replica_db')
    load_to_s3                          =       bash_templateV2('load_to_s3',TBL,'default_pool')
    copy_to_dwh_staging                 =       bash_templateV2('copy_to_dwh_staging',TBL,'default_pool')
    insert_to_dwh_production            =       bash_templateV2('insert_to_dwh_production',TBL,'default_pool')
    delete_csv                          =       bash_templateV2('delete-csv',TBL,'default_pool')

    #=====================================
    # DEPENDENCY GRAPH
    #=====================================

    prepare_sql >> extract_lalafood_prod_replica >> load_to_s3  >> delete_csv >> copy_to_dwh_staging >> insert_to_dwh_production

, и диаграмма DAG выглядит так. В приведенном ниже примере показаны только 3 основные задачи, но на самом деле это больше, поскольку он представляет одну таблицу, которую мы извлекаем из БД. enter image description here

Когда мы запускаем автоматизацию, мы получаем ошибку о том, что задача не выполнена из-за отсутствия слотов. Кажется, мы не можем этого понять. enter image description here

Разве Airflow не помещает задачи в очередь? почему он помечает его как неудачный? Мы предполагаем, что поскольку мы установили только 1 слот, воздушный поток по-прежнему пытается выполнять 4 задачи за раз из-за наших настроек параллелизма. Как исправить эту проблему, чтобы первые две задачи не завершались ошибкой и ожидали в очереди, пока не откроется слот пула replica_db, прежде чем перейти к следующему узлу задачи?

Вот наши настройки dag

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'depends_on_past': False,
    'start_date': STATIC_DATE,
#     'email_on_failure': True,
#     'email_on_retry': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': True,    
    # 'dag': dag,
    'sla': timedelta(hours=2),
    'execution_timeout': None,#timedelta(seconds=300),
#    'email_on_failure': True,
    'email_on_retry': False,
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

dag = DAG(
    dag_id = '{}__EngFlix__prod_replica_v2B__start_{}'.format(STAGE.upper(),STATIC_DATE.date()),
    catchup=False,
    default_args=default_args,
    description='Data stream service for prod preplica to redshift',
    schedule_interval=CRON_SCHEDULE,
    max_active_runs=1,
    concurrency=4
)

def bash_templateV2(STEP,TABLE_ID,POOL):
  return BashOperator(task_id="{}-{}".format(STEP,TABLE_ID),
                      bash_command=EXE_PATH+"{} {} {}".format(EXECUTION_TIME_PLACEHOLDER,STEP,TABLE_ID),
                      pool=POOL,
                      dag=dag
                      )
...