Задачи Apache Airflow застряли в состоянии «up_for_retry» - PullRequest
0 голосов
/ 12 декабря 2018

Я настраивал кластер воздушного потока в нашей системе, и ранее он работал.Я не уверен, что я мог сделать, чтобы изменить это.

У меня есть группа доступности базы данных, которую я хочу запустить по расписанию.Чтобы убедиться, что он работает, я также хотел бы запустить его вручную.Похоже, что в данный момент ни один из них не работает, и для экземпляров задачи не пишутся журналы.Единственными доступными журналами являются журналы планировщика воздушного потока, которые обычно выглядят исправными.

Я просто постоянно получаю это сообщение:

Task is not ready for retry yet but will be retried automatically. Current date is 2018-12-12T11:34:46.978355+00:00 and task will be retried at 2018-12-12T11:35:08.093313+00:00.

Однако, если я подождунемного точно то же самое сообщение представлено снова, за исключением того, что времена немного продвинулись вперед.Следовательно, кажется, что на самом деле задача никогда не повторяется.

Я использую LocalExecutor, а задача - SSHOperator.Упрощенный код ниже.Все, что он делает - это ssh на другую машину и запускает группу приложений с заранее определенной структурой каталогов.: * 10101 *

DB_INFO_FILE = 'info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime.today() - 
timedelta(days=1))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': TIME_IN_PAST,
    'email': ['some_email@blah.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

def _extract_instance_id(instance_string):
    return re.findall(r'\d+', instance_string)[0]

def _read_file_as_json(file_name):
    with open(file_name) as open_file:
         return json.load(open_file)

DB_INFO = _read_file_as_json(DB_INFO_FILE)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)

APP_DIRS = CONFIG_CLIENT.get_values('%my-app-info%')

INSTANCE_START_SCRIPT_PATHS = {
    _extract_instance_id(instance_string): directory+START_SCRIPT
    for instance_string, directory in APP_DIRS.items()
    }

# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='my-conn-id')

# Create a DAG object to add tasks to
DAG = DAG('my-dag-id',
          default_args=DEFAULT_ARGS)

# Create a task for each app instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
    task = SSHOperator(
        task_id='run-script-{0}'.format(instance_id),
        command='bash {0}'.format(start_script),
        ssh_hook=SSH_HOOK,
        dag=DAG)

Это работает, когда я запускаю задачи индивидуально, через командную строку, но не через пользовательский интерфейс,Кажется, я могу запускать задачи, но я просто не могу запустить DAG.Я пробовал много комбинаций start_date s и интервальных расписаний просто для проверки работоспособности.

Любые идеи?

И да, я знаю, что этот вопрос задавался ранее, и я посмотрел навсе они, но не решения, помогли мне.

1 Ответ

0 голосов
/ 18 декабря 2018

О.Ваш start_date меняется с той же скоростью или быстрее, чем интервал расписания.

Вот что планировщик видит каждые несколько секунд:

start_date: 2018-12-11T12:12:12.000Z  # E.G. IFF now is 2018-12-12T12:12:12.000Z, a day ago
schedule_interval: timedelta(days=1)  # The default

Вот что нужно планировщику длягруппа обеспечения доступности баз данных: в последний раз запуск выполнялся более одного интервала времени назад.Если не было выполнено запланированного прогона, первый запланированный прогон мог бы начаться сейчас, если после start_date прошел один полный интервал расписания, поскольку это самая ранняя допустимая дата для execution_date.В этом случае dag_run с execution_date, установленным в начало этого интервала, должен быть создан.Затем можно создать task_instance s для любых задач в группе обеспечения доступности баз данных, чьи зависимости выполняются, если task_instance execution_date находится после start_date группы обеспечения доступности баз данных (это не сохраняется в объекте dag_run, а перерассчитываетсязагружая файл DAG, просто проверяя состояние метки).

Таким образом, он не может быть запланирован автоматически по той причине, что дата начала продолжает меняться, так как интервал удовлетворен.Однако, если бы это было -2d, по крайней мере один запуск был бы запланирован, и тогда любые дальнейшие запуски должны были бы ждать, пока это не будет 1d после этого, чтобы быть запланированным.Однако проще, если вы просто установите фиксированное значение datetime на вашем start_date.

Но как насчет тех странных повторов в ваших ручных запусках ...

Вы запустили ручной запуск или два.Эти прогоны приняли текущее время как execution_date, если вы не указали что-то еще.Это должно быть после start_date, по крайней мере, до завтра, которое должно очистить их для запуска.Но затем в ваших журналах вы видите, что они терпят неудачу и помечаются для повторных попыток, а также не уменьшают ваши повторные попытки.Я не уверен, почему это так, но могло бы случиться так, что что-то не так с SSHOperator.

Вы установили поток воздуха с дополнительным [ssh], чтобы встретились зависимости SSHOperator (особенно paramiko и sshtunnel) на веб-сервере и в планировщике?Один из них работает, потому что я предполагаю, что он анализируется и отображается в пользовательском интерфейсе в зависимости от того, был ли он добавлен в БД.

Что вы получите, если выполните:

airflow test my-dag-id run-script-an_instance_id 2018-12-12T12:12:12

Вы знаете, чтопланировщик и веб-сервер зацикливаются на пополнении пакета DAG и перезапускают этот файл DAG несколько раз в день, перезагружая этот json (это локальный доступ, очень похожий на импорт модуля), и воссоздают этот SSHHook с поиском в БД,Я не вижу ничего необычного в настройке этого хука, почему бы просто не удалить ssh_hook из SSHOperator и заменить его на ssh_conn_id='my-conn-id', чтобы он мог быть создан один раз во время выполнения?Я сомневаюсь, что это проблема, которая вызывает повторные попытки, которые просто переносятся вперед.

...