Блокировка воздушного потока при параллельном запуске нескольких узлов в SubDAG - PullRequest
0 голосов
/ 05 февраля 2020

Структура DAG: У нас есть конвейер ETL, имеющий несколько фаз. Каждая фаза может иметь дочерние фазы внутри, что также верно рекурсивно (до известной глубины, около 3-4). Самый внутренний слой состоит из SQL (в основном с переменным числом, в среднем около 30), выполняемых либо параллельно, либо последовательно, либо одновременно.
Для каждой фазы мы использовали SubDAG.
Мы понимаем, что использование SubDAG - плохая практика, но из-за нашей иерархической структуры они становятся естественным выбором. Мы используем Celery Executor для SubDAG .
Для конечных узлов в самом внутреннем слое у нас есть пользовательский оператор.
Проблема: Мы (очень) время от времени заходим в тупик, и задача SubDAG, в которой размещаются SQL узлы, не выполняется. Нет сбоев дочерних узлов.
Ошибка: sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
StackTrace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/dist-packages/airflow/operators/subdag_operator.py", line 102, in execute
    executor=self.executor)
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/dag.py", line 1284, in run
    job.run()
  File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/base_job.py", line 222, in run
    self._execute()
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 769, in _execute
    session=session)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 699, in _execute_for_run_dates
    session=session)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 586, in _process_backfill_task_instances
    _per_task_process(task, key, ti)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 508, in _per_task_process
    session.commit()
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 1036, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 503, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2479, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2617, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2577, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    update,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 996, in _emit_update_statements
    statement, multiparams
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 581, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 209, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 315, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 239, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET state=%s, queued_dttm=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.execution_date = %s]
[parameters: ('queued', datetime.datetime(2020, 1, 14, 10, 51, 48, 301880, tzinfo=<Timezone [UTC]>), 'SQL_W', 'RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ', datetime.datetime(2020, 1, 14, 10, 28, 36, 123853, tzinfo=<Timezone [UTC]>))]
(Background on this error at: http://sqlalche.me/e/e3q8)


Кроме того, мы также видим предупреждение в журналах как ниже.

[2020-01-14 10:51:43,217] {logging_mixin.py:112} INFO - [2020-01-14 10:51:43,217] {backfill_job.py:246}
WARNING - ('RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ', 'SQL_A', datetime.datetime(2020, 1, 14, 10, 28, 36, 123853, tzinfo=<Timezone [UTC]>), 1) state success not in running=dict_values(
[<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_B 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_C 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_D 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_E 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_F 2020-01-14 10:28:36.123853+00:00 [running]>])


Версия воздушного потока: 1.10.5, 1.10.7

Мы обнаружили, что AIRFLOW-2516 является проблема, с которой мы столкнулись, этот комментарий, указывающий на вероятную причину. Предполагается, что тупик возник из-за двух запросов, получающих блокировки для двух индексов в разном порядке.
Отбрасывание второго индекса состояния задачи может быть не очень хорошей идеей, так как это повлияет на производительность планировщика воздушного потока, которая уже невелика.
Кажется, они исправили тупик на основном уровне DAG, а не на уровне SubDAG.
Мы пытались автоматически повторить попытку того же оператора SubDAG через минуту, используя параметр BaseOperator retries, но в большинстве случаев тупик также возникает при повторной попытке.
Та же проблема может возникнуть, даже если мы удалим SubDAG, что не будет для нас тривиальным упражнением.
Любые боковые решения / обходные пути также приветствуются.
Пожалуйста, дайте мне знать, если требуется файл airflow.cfg.

...