Я пишу интеграционные тесты для специального оператора Airflow. Этот пользовательский оператор проверяет состояние определенных задач, схожих с ExternalTaskSensor
.
в Airflow. В рамках моих тестов мне нужно создать задачу, которая имеет статус «выполняется» в Airflow. Как я могу это сделать? Мои предыдущие попытки не работают:
from airflow.utils.state import State
dag = DAG(dag_id="test_dag")
task = DummyOperator(
dag=dag,
task_id="task_running",
start_date=datetime(2020, 1, 1, 12),
schedule_interval="@daily"
)
ti = TaskInstance(task=task, execution_date=datetime(2020, 1, 1))
task.execute(ti.get_template_context())
# Update task state to 'running'
ti.set_state(state=State.RUNNING)
# ^ this change of state doesn't persist in Airflow's TaskInstance!
with pytest.raises(AirflowSensorTimeout):
# perform test, which should raise AirflowSensorTimeout for a running task
Здесь ti["state"]
возвращает "running"
. Тем не менее, состояние задачи не обновляется в TaskInstance
Airflow. Поэтому, когда я проверяю состояние задачи в TaskInstance
, вместо нее возвращается "success"
, и я не могу запустить свой тест.
Как я могу запустить тесты для задачи с состоянием "running"
?