Я только что установил Cloud Composer Environment в Python 3 и версии образа Composer composer-1.4.0-airflow-1.10.0
.Все настройки в остальном «стоковые»;то есть без переопределения конфигурации.
Я пытаюсь протестировать чрезвычайно простой DAG.Он работает без проблем на моем локальном сервере Airflow, но в Cloud Composer представление информации о задаче веб-сервера имеет сообщение Dependencies Blocking Task From Getting Scheduled
Зависимости Unknown
по следующей причине:
All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number of queueable processes: parallelism, dag_concurrency, max_active_dag_runs_per_dag, non_pooled_task_slot_count
If this task instance does not start soon please contact your Airflow administrator for assistance.
Это происходит независимо от того, выполняется ли задача по расписанию или когда я вручную запускаю ее на веб-сервере (перед тем, как это сделать, я установил все экземпляры задачи, чтобы избежать задержек).Я попытался сбросить планировщик в kubernetes согласно этому ответу , но задачи все еще застряли в запланированном.
Кроме того, я заметил, что в моем локальном экземпляре (при запуске сервера, работника и планировщика в разных контейнерах Docker) столбец Hostname
в представлении «Экземпляры задач» заполнен, но в Cloud Composer он не't.
Вот DAG, которую я использую:
from datetime import datetime, timedelta
import random
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'queue': 'airflow',
'start_date': datetime.today() - timedelta(days=2),
'schedule_interval': None,
'retries': 2,
'retry_delay': timedelta(seconds=15),
'priority_weight': 10,
}
example_dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
def always_succeed():
pass
always_succeed_operator = PythonOperator(
dag=example_dag,
python_callable=always_succeed,
task_id='always_succeed'
)
def might_fail():
return 1 / random.randint(0, 1)
might_fail_operator = PythonOperator(
dag=example_dag, python_callable=might_fail, task_id='might_fail'
)
might_fail_operator.set_upstream(always_succeed_operator)