значение пула воздушного потока от xcom во время выполнения - PullRequest
0 голосов
/ 07 апреля 2020

Существует три пула low, medium и high, и для них настроены слоты 1, 2 и 3.

Теперь task1 установите значение xcom на pool основе по некоторым подсчетам и task2 должно быть запланировано в этом пуле.

Пример кода -

dag = DAG("mongo-connection-test")

def test(**kwargs):
    # some condition to set pool value
    pool = "high" 
    kwargs['ti'].xcom_push(key="pool", value=pool)

task1 = PythonOperator(task_id="set_xcom",
                       python_callable=test,
                       xcom_push=True, 
                       provide_context=True,
                       dag=dag)

task2 = BashOperator(
                   task_id="test",
                   bash_command="echo Hello !!",
                   dag=dag,
                   pool='{{ ti.xcom_pull(task_ids="set_xcom", key="pool") }}',
                   provide_context=True)
task1 >> task2

Но задача 2 не может получить значение из xcom, и планировщик не может выполнить команду без указания пула exist '{{ ti.xcom_pull(task_ids="test", key="pool") }}'. Нужна помощь в устранении этого?

1 Ответ

0 голосов
/ 08 апреля 2020

Рекомендую прочитать документацию о Jinja Templating in Airflow . Если вы хотите его использовать, попробуйте сначала взглянуть на template_fields в исходном коде (например, BashOperator )

Поскольку pool нет в этом списке, то, что вы пытаетесь делать не получится.

Поэтому я бы порекомендовал вам создать задачу 3 раза с 3 различными настройками пула и использовать BranchPythonOperator, чтобы выбрать, какая из 3 задач должна выполняться (вы можете потянуть из XCom в BranchPythonOperator)

Ваш DAG, вероятно, должен выглядеть следующим образом:

task1 >> branch_task >> [task2_low, task2_medium, task2_high]
...