Планировщик воздушного потока выдает ошибку для групп доступности баз данных с schedule_interval как None - PullRequest
0 голосов
/ 08 апреля 2020

У меня проблема с потоком воздуха. Существует скрипт генератора клиента, который принимает входные данные из файла yaml и загружает группы DAG. Это прекрасно работает, когда все файлы DAG yaml имеют интервал расписания как «Нет». Существует много групп доступности баз данных, для которых для schedule_interval задано значение None, и немногие из них имеют @ один раз.

Пример файла YAML: -

cluster:
  nodes: 10
  subnet: "subnet-A"
  instance: "m4.2xlarge"
  configbucket: "bucketabc"
  jar: "s3://xxxxx.jar"
  conf: "app.conf"

schedule:
  state: "unpause"
  concurrency: 10
  startdate: "2050-08-05 00:00"
  cron: "None"

Ниже приведен сценарий генератора -

            if "schedule" in project_settings:
                schedule_settings = project_settings["schedule"]
                concurrency = schedule_settings["concurrency"]
                cron =  schedule_settings["cron"]
                startdate =  datetime.strptime(schedule_settings["startdate"], "%Y-%m-%d %H:%M")

            #print "my projectname is: " + project

            dag = DAG(
                dag_id = project,
                default_args=args,
                user_defined_macros=user_macros,
                schedule_interval=cron,
                concurrency=concurrency,
                start_date=startdate
            )

Ошибка, которую я получаю, когда есть много групп DAG с schedule_interval = None

INFO - [2020-04-08 12:30:45,529] {dagbag.py:302} ERROR - Failed to bag_dag: /home/deploy/airflow/dags/genertor.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 296, in process_file
    croniter(dag._schedule_interval)
  File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 91, in __init__
    self.expanded, self.nth_weekday_of_month = self.expand(expr_format)
  File "/usr/local/lib/python3.6/site-packages/croniter/croniter.py", line 468, in expand
    raise CroniterBadCronError(cls.bad_length)
croniter.croniter.CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.

Кто-нибудь сталкивался с этой проблемой?

1 Ответ

2 голосов
/ 08 апреля 2020

Воздушный поток DAG schedule_interval может быть cron эспрессией как string или None (NB, а не string "None").

В ваших настройках вы иметь:

cron: "None"

, что является строкой в ​​Python. Если вы не можете изменить этот файл YAML на:

cron: None

, вы все равно можете проверить эту строку в самой группе обеспечения доступности баз данных:

schedule_interval = None if cron == "None" else cron
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...