Благодарю за помощь всех, кто может указать нам в правильном направлении.
У нас есть конфигурация пула воздушного потока, в которой мы планируем ограничить параллелизм при вызовах API в нашем mariaDB до 1, чтобы не перегружать БД. Наша главная цель - обеспечить параллелизм для задач, которые не требуют регулирования, чтобы повысить скорость нашего Daily ETL.
Для этого мы установили слоты пула для replica_db равными 1, как показано на изображении ниже
На нашей диаграмме DAG ниже мы намерены установить для первых двух задач использование слота replica_db
# #=====================================
# # JOB TASKS
# #=====================================
for TBL in LIST_OF_TABLES:
prepare_sql = bash_templateV2('prepare_sql',TBL,'replica_db')
extract_lalafood_prod_replica = bash_templateV2('extract_lalafood_prod_replica',TBL,'replica_db')
load_to_s3 = bash_templateV2('load_to_s3',TBL,'default_pool')
copy_to_dwh_staging = bash_templateV2('copy_to_dwh_staging',TBL,'default_pool')
insert_to_dwh_production = bash_templateV2('insert_to_dwh_production',TBL,'default_pool')
delete_csv = bash_templateV2('delete-csv',TBL,'default_pool')
#=====================================
# DEPENDENCY GRAPH
#=====================================
prepare_sql >> extract_lalafood_prod_replica >> load_to_s3 >> delete_csv >> copy_to_dwh_staging >> insert_to_dwh_production
, и диаграмма DAG выглядит так. В приведенном ниже примере показаны только 3 основные задачи, но на самом деле это больше, поскольку он представляет одну таблицу, которую мы извлекаем из БД. ![enter image description here](https://i.stack.imgur.com/FKTaJ.png)
Когда мы запускаем автоматизацию, мы получаем ошибку о том, что задача не выполнена из-за отсутствия слотов. Кажется, мы не можем этого понять. ![enter image description here](https://i.stack.imgur.com/N7XMQ.png)
Разве Airflow не помещает задачи в очередь? почему он помечает его как неудачный? Мы предполагаем, что поскольку мы установили только 1 слот, воздушный поток по-прежнему пытается выполнять 4 задачи за раз из-за наших настроек параллелизма. Как исправить эту проблему, чтобы первые две задачи не завершались ошибкой и ожидали в очереди, пока не откроется слот пула replica_db, прежде чем перейти к следующему узлу задачи?
Вот наши настройки dag
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'depends_on_past': False,
'start_date': STATIC_DATE,
# 'email_on_failure': True,
# 'email_on_retry': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': True,
# 'dag': dag,
'sla': timedelta(hours=2),
'execution_timeout': None,#timedelta(seconds=300),
# 'email_on_failure': True,
'email_on_retry': False,
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
dag_id = '{}__EngFlix__prod_replica_v2B__start_{}'.format(STAGE.upper(),STATIC_DATE.date()),
catchup=False,
default_args=default_args,
description='Data stream service for prod preplica to redshift',
schedule_interval=CRON_SCHEDULE,
max_active_runs=1,
concurrency=4
)
def bash_templateV2(STEP,TABLE_ID,POOL):
return BashOperator(task_id="{}-{}".format(STEP,TABLE_ID),
bash_command=EXE_PATH+"{} {} {}".format(EXECUTION_TIME_PLACEHOLDER,STEP,TABLE_ID),
pool=POOL,
dag=dag
)