Существует три пула low
, medium
и high
, и для них настроены слоты 1, 2 и 3.
Теперь task1
установите значение xcom на pool
основе по некоторым подсчетам и task2
должно быть запланировано в этом пуле.
Пример кода -
dag = DAG("mongo-connection-test")
def test(**kwargs):
# some condition to set pool value
pool = "high"
kwargs['ti'].xcom_push(key="pool", value=pool)
task1 = PythonOperator(task_id="set_xcom",
python_callable=test,
xcom_push=True,
provide_context=True,
dag=dag)
task2 = BashOperator(
task_id="test",
bash_command="echo Hello !!",
dag=dag,
pool='{{ ti.xcom_pull(task_ids="set_xcom", key="pool") }}',
provide_context=True)
task1 >> task2
Но задача 2 не может получить значение из xcom, и планировщик не может выполнить команду без указания пула exist '{{ ti.xcom_pull(task_ids="test", key="pool") }}'
. Нужна помощь в устранении этого?