Airflow DAG получает psycopg2.OperationalError при создании задач с KubernetesPodOperator - PullRequest
0 голосов
/ 29 сентября 2019
ERROR - Scheduler heartbeat got an exception: (psycopg2.OperationalError) could not translate host name "airflow-prod-postgresql" to address: Temporary failure in name resolution  (Background on this error at: http://sqlalche.me/e/e3q8)

В настоящее время я выполняю задание в Airflow, в котором параллельно выполняется около 10 задач, и для простоты мы можем предположить, что все эти задачи подключаются к GBQ, выполняют какой-либо запрос SQL, а затем возвращают выводв качестве кадра данных панд.Я настроил Airflow таким образом, чтобы каждая задача представляла собой свой собственный контейнер Docker внутри модуля Kubernetes, сгенерированного KubernetesPodOperator.Кластер Kubernetes настроен с помощью Amazon EKS и в настоящее время имеет два узла (экземпляры EC2) с включенным автоматическим масштабированием максимум для четырех узлов.

Однако при работе с AirAG DAG только около 2/10из задач завершаются успешно, в то время как остальные получают psycopg2.OperationalError.Некоторые из других невыполненных задач получают сообщение No host supplied вместе с Log file does not exist.

. При запуске DAG в разное время сбойные задачи не всегда согласованы.Похоже, корень этой проблемы связан с подключением к базе данных из-за сообщения об ошибке psycopg2.Из документов по SQLAlchemy, касающихся OperationalError:

Exception raised for errors that are related to the database’s operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc.
This error is a DBAPI Error and originates from the database driver (DBAPI), not SQLAlchemy itself.
The OperationalError is the most common (but not the only) error class used by drivers in the context of the database connection being dropped, or not being able to connect to the database. For tips on how to deal with this, see the section Dealing with Disconnects.

Но, честно говоря, я недостаточно разбираюсь в том, чтобы определить, является ли решение какой-либо конфигурацией на уровне ядра Airflow (глобального) или DAGопределение (локальный) уровень.Кроме того, проблема может заключаться в том, что настройка кластера Kubernetes, например, автоматическое масштабирование из Amazon EKS, не работает должным образом.

Для большего контекста у меня есть pool_size=5 и pool_recycle=1800 и использование CeleryExecutor.

В отдельном испытании я установил concurrency=4 в определении DAG, и я вижу похожие сбои, за исключением основного psycopg2.OperationalError, который выглядит немного иначе:

ERROR - Scheduler heartbeat got an exception: (psycopg2.OperationalError) could not connect to server: Connection timed out
    Is the server running on host "airflow-prod-postgresql" and accepting
    TCP/IP connections on port 5432?
 (Background on this error at: http://sqlalche.me/e/e3q8)

Дополнительная ошибкасообщения от других невыполненных задач -

Первая попытка:

  1. No host supplied
  2. Failed to fetch log file from worker

При второй попытке:

  1. dependency 'Task Instance Not Already Running' FAILED: Task is already running
  2. Dependencies not met for TaskInstance: dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

Я ожидаю, что все 10 задач будут выполняться в этой группе DAG Airflow с высокой доступностью, в отличие оттолько примерно 2 успешных задания.

Заранее большое спасибо !!

...