Программно очистить состояние экземпляров задачи воздушного потока - PullRequest
1 голос
/ 01 октября 2019

Я хочу очистить задачи в DAG B, когда DAG A завершит выполнение. И A, и B являются запланированными группами обеспечения доступности баз данных.

Есть ли какой-либо оператор / способ очистки состояния задачи и программного запуска повторных задач?

Мне известно о CLI option и опция Web UI для очистки задач.

1 Ответ

1 голос
/ 01 октября 2019
  • cli.py - невероятно полезное место, чтобы заглянуть в SQLAlchemy магию Airflow.
  • Команда clearреализовано здесь
@cli_utils.action_logging
def clear(args):
    logging.basicConfig(
        level=settings.LOGGING_LEVEL,
        format=settings.SIMPLE_LOG_FORMAT)
    dags = get_dags(args)

    if args.task_regex:
        for idx, dag in enumerate(dags):
            dags[idx] = dag.sub_dag(
                task_regex=args.task_regex,
                include_downstream=args.downstream,
                include_upstream=args.upstream)

    DAG.clear_dags(
        dags,
        start_date=args.start_date,
        end_date=args.end_date,
        only_failed=args.only_failed,
        only_running=args.only_running,
        confirm_prompt=not args.no_confirm,
        include_subdags=not args.exclude_subdags,
        include_parentdag=not args.exclude_parentdag,
    )
  • Глядя на источник, вы можете либо
    • скопировать его (при условии, что вы также хотитенемного изменить функциональность)
    • или, может быть, просто сделать from airflow.bin import cli и вызвать нужные функции напрямую
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...