Я запутался, как это не работает.Журнал ошибок, которые вы получаете, был бы полезен.
Вкратце, то, что вы делаете, выглядит хорошо, если max_tasks=2
вы получите:
task_psql_queue.taskid --> scp_queue_task_0 >> process_task_0 >> manifest_task_0
\-> scp_queue_task_1 >> process_task_1 >> manifest_task_1
Я подозреваю васне нужны тайм-ауты, которые очень короткие.Поскольку у вас очень длинные строки и случайное изменение порядка именованных параметров, я переформатирую то, что вы написали:
for i in range(0, max_tasks):
task_scp_queue = BashOperator(
task_id="scp_queue_task_{}".format(i),
dag=dag,
retries=3, # you could make it a default arg on the dag
pool="scp_queue_pool",
queue="foo", # you really want both queue and pool? When debugging remove them.
bash_command="python foo", # Maybe you snipped a multiline command
provide_context=True, # BashOp doesn't have this argument
xcom_push=True, # PUSH the manifest ID FOR the NEXT task via xcom
)
task_process_queue = DockerOperator(
task_id="process_task_{}".format(i),
dag=dag,
retries=3,
pool="process_pool",
queue="foo",
execution_timeout=timedelta(minutes=5),
api_version="auto",
image="foo",
command="python foo --queue-name="
"{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
)
task_manifest = DockerOperator(
task_id="manifest_task_{}".format(i),
retries=3,
dag=dag,
pool="manfiest_pool",
queue="d_parser",
api_version="auto",
image="foo",
command="python --manifestid="
"{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
)
task_psql_queue >> task_scp_queue >> task_process_queue >> task_manifest
О, теперь посмотрите, вы не передали task_ids
как строки.Попробуйте:
command="python foo --queue-name="
"{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),
… … …
command="python --manifestid="
"{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),