Airflow 1.10.5 Celery executor
Сбой планировщика воздушного потока с ошибкой соединения postgres sqlalchemy (подробности ошибки приведены ниже). Эта ошибка существует в одной среде, другая аналогичная среда работает.Существуют ли какие-либо несовместимости версий, оцените любую помощь для их устранения.
Чтобы изолировать проблему, тестирование соединений с postgres вручную работает без проблем
Пример кода соединения (работает sqlalchemy postgres)
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg2://username:password@servername.postgres.database.azure.com/airflow", echo=True, pool_size=6, max_overflow=10, encoding='latin1')
engine.connect()
print(engine)
Ошибка
Celery Task ID: ('example_dag', 'bash_task', datetime.datetime(2018, 10, 8, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
Traceback (most recent call last):
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2275, in _wrap_pool_connect
return fn()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
return _ConnectionFairy._checkout(self)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 760, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
rec = pool._do_get()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
self._dec_overflow()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
return self._create_connection()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
return _ConnectionRecord(self)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
self.__connect(first_connect_check=True)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 639, in __connect
connection = pool._invoke_creator(self)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
return dialect.connect(*cargs, **cparams)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 453, in connect
return self.dbapi.connect(*cargs, **cparams)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/psycopg2/__init__.py", line 126, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: could not connect to server: No such file or directory
Is the server running locally and accepting
connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5432"?
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/airflow/executors/celery_executor.py", line 106, in fetch_celery_task_state
res = (celery_task[0], celery_task[1].state)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/result.py", line 473, in state
return self._get_task_meta()['status']
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/result.py", line 412, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/base.py", line 386, in get_task_meta
meta = self._get_task_meta_for(task_id)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 53, in _inner
return fun(*args, **kwargs)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 122, in _get_task_meta_for
session = self.ResultSession()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 99, in ResultSession
**self.engine_options)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/session.py", line 59, in session_factory
self.prepare_models(engine)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/session.py", line 54, in prepare_models
ResultModelBase.metadata.create_all(engine)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/sql/schema.py", line 4294, in create_all
ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2045, in _run_visitor
with self._optional_conn_ctx_manager(connection) as conn:
File "/usr/lib64/python3.6/contextlib.py", line 81, in __enter__
return next(self.gen)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2037, in _optional_conn_ctx_manager
with self._contextual_connect() as conn:
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2239, in _contextual_connect
self._wrap_pool_connect(self.pool.connect, None),
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2279, in _wrap_pool_connect
e, dialect, self
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1544, in _handle_dbapi_exception_noconnection
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
raise value.with_traceback(tb)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2275, in _wrap_pool_connect
return fn()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
return _ConnectionFairy._checkout(self)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 760, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
rec = pool._do_get()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
self._dec_overflow()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
return self._create_connection()
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
return _ConnectionRecord(self)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
self.__connect(first_connect_check=True)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 639, in __connect
connection = pool._invoke_creator(self)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
return dialect.connect(*cargs, **cparams)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 453, in connect
return self.dbapi.connect(*cargs, **cparams)
File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/psycopg2/__init__.py", line 126, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not connect to server: No such file or directory
Is the server running locally and accepting
connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5432"?
(Background on this error at: http://sqlalche.me/e/e3q8)