Структура DAG: У нас есть конвейер ETL, имеющий несколько фаз. Каждая фаза может иметь дочерние фазы внутри, что также верно рекурсивно (до известной глубины, около 3-4). Самый внутренний слой состоит из SQL (в основном с переменным числом, в среднем около 30), выполняемых либо параллельно, либо последовательно, либо одновременно.
Для каждой фазы мы использовали SubDAG.
Мы понимаем, что использование SubDAG - плохая практика, но из-за нашей иерархической структуры они становятся естественным выбором. Мы используем Celery Executor для SubDAG .
Для конечных узлов в самом внутреннем слое у нас есть пользовательский оператор.
Проблема: Мы (очень) время от времени заходим в тупик, и задача SubDAG, в которой размещаются SQL узлы, не выполняется. Нет сбоев дочерних узлов.
Ошибка: sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
StackTrace:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/dist-packages/airflow/operators/subdag_operator.py", line 102, in execute
executor=self.executor)
File "/usr/local/lib/python3.6/dist-packages/airflow/models/dag.py", line 1284, in run
job.run()
File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/base_job.py", line 222, in run
self._execute()
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 769, in _execute
session=session)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 699, in _execute_for_run_dates
session=session)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 586, in _process_backfill_task_instances
_per_task_process(task, key, ti)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/jobs/backfill_job.py", line 508, in _per_task_process
session.commit()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 1036, in commit
self.transaction.commit()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 503, in commit
self._prepare_impl()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2479, in flush
self._flush(objects)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2617, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/session.py", line 2577, in _flush
flush_context.execute()
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
update,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/orm/persistence.py", line 996, in _emit_update_statements
statement, multiparams
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 982, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 152, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1246, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 581, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 209, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 315, in _query
db.query(q)
File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 239, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET state=%s, queued_dttm=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.execution_date = %s]
[parameters: ('queued', datetime.datetime(2020, 1, 14, 10, 51, 48, 301880, tzinfo=<Timezone [UTC]>), 'SQL_W', 'RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ', datetime.datetime(2020, 1, 14, 10, 28, 36, 123853, tzinfo=<Timezone [UTC]>))]
(Background on this error at: http://sqlalche.me/e/e3q8)
Кроме того, мы также видим предупреждение в журналах как ниже.
[2020-01-14 10:51:43,217] {logging_mixin.py:112} INFO - [2020-01-14 10:51:43,217] {backfill_job.py:246}
WARNING - ('RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ', 'SQL_A', datetime.datetime(2020, 1, 14, 10, 28, 36, 123853, tzinfo=<Timezone [UTC]>), 1) state success not in running=dict_values(
[<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_B 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_C 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_D 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_E 2020-01-14 10:28:36.123853+00:00 [running]>,
<TaskInstance: RANDOM_DAG_ID.PhaseX.PhaseY.PhaseZ.SQL_F 2020-01-14 10:28:36.123853+00:00 [running]>])
Версия воздушного потока: 1.10.5, 1.10.7
Мы обнаружили, что AIRFLOW-2516 является проблема, с которой мы столкнулись, этот комментарий, указывающий на вероятную причину. Предполагается, что тупик возник из-за двух запросов, получающих блокировки для двух индексов в разном порядке.
Отбрасывание второго индекса состояния задачи может быть не очень хорошей идеей, так как это повлияет на производительность планировщика воздушного потока, которая уже невелика.
Кажется, они исправили тупик на основном уровне DAG, а не на уровне SubDAG.
Мы пытались автоматически повторить попытку того же оператора SubDAG через минуту, используя параметр BaseOperator retries
, но в большинстве случаев тупик также возникает при повторной попытке.
Та же проблема может возникнуть, даже если мы удалим SubDAG, что не будет для нас тривиальным упражнением.
Любые боковые решения / обходные пути также приветствуются.
Пожалуйста, дайте мне знать, если требуется файл airflow.cfg.