Автокоммит Airflow PostgresOperator (база данных красного смещения) не работает - PullRequest
0 голосов
/ 25 мая 2020
  • Воздушный поток: v1.10.3
  • Redshift: v1.0.15896

У меня есть оператор SQL, который (повторно) создает схему, а затем создает представление в этой схеме.

Оператор SQL выглядит следующим образом:

drop schema if exists my_schema CASCADE;
create schema my_schema;
create or replace view my_schema.my_view as select * from other_schema.table with no schema binding;

Если я запустил выше SQL операторы через клиентский инструмент (например, DBeaver или Datagrip, в режиме автоматической фиксации) ) все работает должным образом и не возвращает ошибок.

Однако, когда я запускаю тот же оператор SQL через Airflow с помощью PostgresOperator, я всегда получаю следующее сообщение об ошибке:

[2020-05-25 15:03:49,155] {__init__.py:1580} ERROR - schema "my_schema" does not exist
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/operators/postgres_operator.py", line 65, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/usr/local/lib/python2.7/dist-packages/airflow/hooks/dbapi_hook.py", line 172, in run
    cur.execute(s)
ProgrammingError: schema "my_schema" does not exist

Вот как я выполняю оператор SQL в моей группе DAG:

[...]
from airflow.operators.postgres_operator import PostgresOperator
[...]
PostgresOperator(
        task_id=task_id, postgres_conn_id="my_connection", sql=sql, autocommit=True
)

Если я сначала запускаю «drop and re-create», а «create or replace view» во втором операторе, он работает без вопрос.

Это также работает, если схема уже существует.

Следовательно, я подозреваю, что почему-то не происходит фиксации, хотя для autocommit установлено значение True.

Я уже пробовал много вещей в операторах SQL, включая

  • завершение создания схемы в транзакции с помощью begin; и конец;" операторы
  • Добавление «COMMIT;» после создания (повторного) создания схемы
  • Добавление "set AUTOCOMMIT TO ON;"

но пока ничего не помогло. Любая помощь будет принята с благодарностью.

...