Apache Airflow Xcom Pull из динамического имени задачи - PullRequest
0 голосов
/ 25 октября 2018

Я успешно создал динамические задачи в DAG (операторы Bash и Docker), но мне трудно передавать эти динамически созданные задачи в xcom_pull для получения данных.

for i in range(0, max_tasks):
    task_scp_queue = BashOperator(task_id="scp_queue_task_{}".format(i), bash_command="""python foo""", retries=3, dag=dag, pool="scp_queue_pool", queue="foo", provide_context=True, xcom_push=True) # Pull the manifest ID from the previous task via xcom'

    task_process_queue = DockerOperator(task_id="process_task_{}".format(i), command="""python foo --queue-name={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), retries=3, dag=dag, pool="process_pool", api_version="auto", image="foo", queue="foo", execution_timeout=timedelta(minutes=5))
    task_manifest = DockerOperator(api_version="auto", task_id="manifest_task_{}".format(i), image="foo", retries=3, dag=dag, command=""" python --manifestid={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), pool="manfiest_pool", queue="d_parser")

    task_psql_queue.set_downstream(task_scp_queue)
    task_process_queue.set_upstream(task_scp_queue)
    task_manifest.set_upstream(task_process_queue)

Как вы можете видетьЯ попытался просто использовать строку формата Python в шаблоне Jinja, чтобы передать в него переменную i, однако это не сработало.

Я также попытался использовать "task.task_id" и создать новую строку столько task_id, но это тоже не работает.

Редактировать:

Теперь команда выглядит следующим образом

command="""python foo \ 
    --queue-name="{{ 
    task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}" 
     """.format(i)

И мои журналы отладки изВоздушный поток выглядит как

Using Master Queue: process_{ 
task_instance.xcom_pull(task_ids='scp_queue_task_31') }

Таким образом, строковое значение заполняется, но не выполняет xcom_pull.

1 Ответ

0 голосов
/ 26 октября 2018

Я запутался, как это не работает.Журнал ошибок, которые вы получаете, был бы полезен.

Вкратце, то, что вы делаете, выглядит хорошо, если max_tasks=2 вы получите:

task_psql_queue.taskid --> scp_queue_task_0 >> process_task_0 >> manifest_task_0
                       \-> scp_queue_task_1 >> process_task_1 >> manifest_task_1

Я подозреваю васне нужны тайм-ауты, которые очень короткие.Поскольку у вас очень длинные строки и случайное изменение порядка именованных параметров, я переформатирую то, что вы написали:

for i in range(0, max_tasks):
    task_scp_queue = BashOperator(
        task_id="scp_queue_task_{}".format(i),
        dag=dag,
        retries=3,  # you could make it a default arg on the dag
        pool="scp_queue_pool",
        queue="foo", # you really want both queue and pool? When debugging remove them.
        bash_command="python foo",  # Maybe you snipped a multiline command
        provide_context=True,  # BashOp doesn't have this argument
        xcom_push=True,  # PUSH the manifest ID FOR the NEXT task via xcom
    )

    task_process_queue = DockerOperator(
        task_id="process_task_{}".format(i),
        dag=dag,
        retries=3,
        pool="process_pool",
        queue="foo",
        execution_timeout=timedelta(minutes=5),
        api_version="auto",
        image="foo",
        command="python foo --queue-name="
                "{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
    )

    task_manifest = DockerOperator(
        task_id="manifest_task_{}".format(i),
        retries=3,
        dag=dag,
        pool="manfiest_pool",
        queue="d_parser",
        api_version="auto",
        image="foo",
        command="python --manifestid="
                "{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
    )

    task_psql_queue >> task_scp_queue >> task_process_queue >> task_manifest

О, теперь посмотрите, вы не передали task_ids как строки.Попробуйте:

        command="python foo --queue-name="
                "{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),
… … …
        command="python --manifestid="
                "{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),
...