Я использую Airflow DAG с помощью Google Cloud Composer , который имеет следующее определение:
with airflow.DAG(
'build_daily_rollups',
default_args={
'owner': 'airflow',
'start_date': datetime(2019, 5, 7, 6, tzinfo=new_york_tz),
'concurrency': 1,
'retries': 1 # safe b/c this DAG is idempotent
},
schedule_interval=timedelta(hours=24),
catchup=False
) as dag:
roll_up_tasks = []
for feed_id in FEEDMAP:
task_id = f'roll_up_feed_{feed_id}'
task = PythonOperator(
task_id=task_id,
params={'feed_id': feed_id, 'ts_f_strings': ts_f_strings},
python_callable=parse_feed,
provide_context=True
)
dag.add_task(task)
roll_up_tasks.append(task)
packager = PythonOperator(
task_id='build_package',
python_callable=build_package
)
for task in roll_up_tasks:
task >> packager
(я опускаю вызываемый код parse_feed
, поскольку он не особенно актуаленк этой проблеме)
Я запустил этот DAG вручную.Группа обеспечения доступности баз данных вошла в состояние «выполнения», а затем ничего не произошло - ни одна задача не была запланирована или даже поставлена в очередь, и состояние каждой задачи в группе доступности базы данных остается null
.При попытке перейти к панели «Сведения об экземпляре задачи» для этой группы доступности базы данных я получил следующую ошибку:
Traceback (most recent call last):
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/opt/python3.6/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
raise value
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/utils.py", line 262, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/views.py", line 788, in task
dep_context=dep_context)]
File "/usr/local/lib/airflow/airflow/www/views.py", line 786, in <listcomp>
failed_dep_reasons = [(dep.dep_name, dep.reason) for dep in
File "/usr/local/lib/airflow/airflow/models.py", line 1210, in get_failed_dep_statuses
dep_context):
File "/usr/local/lib/airflow/airflow/ti_deps/deps/base_ti_dep.py", line 100, in get_dep_statuses
for dep_status in self._get_dep_statuses(ti, session, dep_context):
File "/usr/local/lib/airflow/airflow/ti_deps/deps/exec_date_after_start_date_dep.py", line 24, in _get_dep_statuses
if ti.task.start_date and ti.execution_date < ti.task.start_date:
TypeError: can't compare offset-naive and offset-aware datetimes
Это определение группы доступности базы данных должно учитывать часовой пояс, и эта параметризация - то, что я получил от , читая соответствующий раздел документации по воздушному потоку .Кроме того, мне удалось успешно перейти на эту страницу в локальном экземпляре Airflow на моем персональном компьютере.
Какова вероятная причина этой ошибки?