Интеграционные тесты в Airflow: как запустить тесты для задачи с состоянием «выполняется»? - PullRequest
0 голосов
/ 28 января 2020

Я пишу интеграционные тесты для специального оператора Airflow. Этот пользовательский оператор проверяет состояние определенных задач, схожих с ExternalTaskSensor.

в Airflow. В рамках моих тестов мне нужно создать задачу, которая имеет статус «выполняется» в Airflow. Как я могу это сделать? Мои предыдущие попытки не работают:

from airflow.utils.state import State

dag = DAG(dag_id="test_dag")
task = DummyOperator(
    dag=dag,
    task_id="task_running",
    start_date=datetime(2020, 1, 1, 12),
    schedule_interval="@daily"
)
ti = TaskInstance(task=task, execution_date=datetime(2020, 1, 1))
task.execute(ti.get_template_context())

# Update task state to 'running'
ti.set_state(state=State.RUNNING)
# ^ this change of state doesn't persist in Airflow's TaskInstance!

with pytest.raises(AirflowSensorTimeout):
    # perform test, which should raise AirflowSensorTimeout for a running task

Здесь ti["state"] возвращает "running". Тем не менее, состояние задачи не обновляется в TaskInstance Airflow. Поэтому, когда я проверяю состояние задачи в TaskInstance, вместо нее возвращается "success", и я не могу запустить свой тест.

Как я могу запустить тесты для задачи с состоянием "running"?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...