Airflow DAGs не работают в Google Cloud Composer: «Задача блокировки зависимостей от планирования» - PullRequest
0 голосов
/ 20 декабря 2018

Я только что установил Cloud Composer Environment в Python 3 и версии образа Composer composer-1.4.0-airflow-1.10.0.Все настройки в остальном «стоковые»;то есть без переопределения конфигурации.

Я пытаюсь протестировать чрезвычайно простой DAG.Он работает без проблем на моем локальном сервере Airflow, но в Cloud Composer представление информации о задаче веб-сервера имеет сообщение Dependencies Blocking Task From Getting Scheduled

Зависимости Unknown по следующей причине:

All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number of queueable processes: parallelism, dag_concurrency, max_active_dag_runs_per_dag, non_pooled_task_slot_count

If this task instance does not start soon please contact your Airflow administrator for assistance.

Это происходит независимо от того, выполняется ли задача по расписанию или когда я вручную запускаю ее на веб-сервере (перед тем, как это сделать, я установил все экземпляры задачи, чтобы избежать задержек).Я попытался сбросить планировщик в kubernetes согласно этому ответу , но задачи все еще застряли в запланированном.

Кроме того, я заметил, что в моем локальном экземпляре (при запуске сервера, работника и планировщика в разных контейнерах Docker) столбец Hostname в представлении «Экземпляры задач» заполнен, но в Cloud Composer он не't.

Вот DAG, которую я использую:

from datetime import datetime, timedelta
import random

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'queue': 'airflow',
    'start_date': datetime.today() - timedelta(days=2),
    'schedule_interval': None,
    'retries': 2,
    'retry_delay': timedelta(seconds=15),
    'priority_weight': 10,
}


example_dag = DAG(
    'example_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1)
)


def always_succeed():
    pass


always_succeed_operator = PythonOperator(
    dag=example_dag,
    python_callable=always_succeed,
    task_id='always_succeed'
)


def might_fail():
    return 1 / random.randint(0, 1)


might_fail_operator = PythonOperator(
    dag=example_dag, python_callable=might_fail, task_id='might_fail'
)


might_fail_operator.set_upstream(always_succeed_operator)

1 Ответ

0 голосов
/ 06 января 2019

Cloud Composer не поддерживает несколько очередей сельдерея, пожалуйста, удалите 'queue' : 'airflow' из аргументов по умолчанию.Это должно исправить вашу проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...