ошибки подключения планировщика airflow 1.10.5 к postgres - PullRequest
0 голосов
/ 25 сентября 2019

Airflow 1.10.5 Celery executor

Сбой планировщика воздушного потока с ошибкой соединения postgres sqlalchemy (подробности ошибки приведены ниже). Эта ошибка существует в одной среде, другая аналогичная среда работает.Существуют ли какие-либо несовместимости версий, оцените любую помощь для их устранения.

Чтобы изолировать проблему, тестирование соединений с postgres вручную работает без проблем

Пример кода соединения (работает sqlalchemy postgres)

from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://username:password@servername.postgres.database.azure.com/airflow", echo=True, pool_size=6, max_overflow=10, encoding='latin1')

engine.connect()

print(engine)

Ошибка

Celery Task ID: ('example_dag', 'bash_task', datetime.datetime(2018, 10, 8, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
Traceback (most recent call last):
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2275, in _wrap_pool_connect
    return fn()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
    return _ConnectionFairy._checkout(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 760, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
    rec = pool._do_get()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
    self._dec_overflow()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
    return self._create_connection()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
    return _ConnectionRecord(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
    self.__connect(first_connect_check=True)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 639, in __connect
    connection = pool._invoke_creator(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
    return dialect.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 453, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/psycopg2/__init__.py", line 126, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: could not connect to server: No such file or directory
    Is the server running locally and accepting
    connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5432"?


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/airflow/executors/celery_executor.py", line 106, in fetch_celery_task_state
    res = (celery_task[0], celery_task[1].state)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/result.py", line 473, in state
    return self._get_task_meta()['status']
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/result.py", line 412, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/base.py", line 386, in get_task_meta
    meta = self._get_task_meta_for(task_id)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 53, in _inner
    return fun(*args, **kwargs)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 122, in _get_task_meta_for
    session = self.ResultSession()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 99, in ResultSession
    **self.engine_options)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/session.py", line 59, in session_factory
    self.prepare_models(engine)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/session.py", line 54, in prepare_models
    ResultModelBase.metadata.create_all(engine)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/sql/schema.py", line 4294, in create_all
    ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2045, in _run_visitor
    with self._optional_conn_ctx_manager(connection) as conn:
  File "/usr/lib64/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2037, in _optional_conn_ctx_manager
    with self._contextual_connect() as conn:
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2239, in _contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2279, in _wrap_pool_connect
    e, dialect, self
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1544, in _handle_dbapi_exception_noconnection
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2275, in _wrap_pool_connect
    return fn()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
    return _ConnectionFairy._checkout(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 760, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
    rec = pool._do_get()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
    self._dec_overflow()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
    return self._create_connection()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
    return _ConnectionRecord(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
    self.__connect(first_connect_check=True)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 639, in __connect
    connection = pool._invoke_creator(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
    return dialect.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 453, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/psycopg2/__init__.py", line 126, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not connect to server: No such file or directory
    Is the server running locally and accepting
    connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5432"?

(Background on this error at: http://sqlalche.me/e/e3q8)
...