Мы используем Apache Airflow для запуска DML в таких базах данных, как Postgres и Redshift.
Прямо сейчас мы используем хук Postgres , который реализует psycopg2.
Когда я запускаю команды COPY
или DML, такие как Delete
, Insert
, Update
, я не получаю затронутое количество записей в журналах Airflow. Я получаю их как вывод, когда выполняю их с помощью инструмента командной строки psql
.
Ниже приведен код, который выполняет SQL в моей базе кода:
def execute(self, context):
hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.schema)
if self.pg_preoperator:
logging.info("Setting up Postgres operator.")
hook.run(self.pg_preoperator)
logging.info('Executing: ' + str(self.sql))
logging.info('Parameters: ' + str(self.parameters))
hook.run(self.sql, self.autocommit, parameters=self.parameters)
if self.pg_postoperator:
logging.info("Finished Postgres query.")
hook.run(self.pg_postoperator)
Как мне изменить код, чтобы получать выходные записи, затронутые в моих журналах Airflow? В коде хука Postgres я не нашел ни одной переменной, которая, по-моему, фиксирует вывод cursor.execute
.
Кроме того, ниже приведена команда запуска, которая в конечном итоге называется:
def run(self, sql, autocommit=False, parameters=None):
"""
Runs a command or a list of commands. Pass a list of sql
statements to the sql parameter to get them to execute
sequentially
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
:type autocommit: bool
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
if isinstance(sql, str):
sql = [sql]
with closing(self.get_conn()) as conn:
if self.supports_autocommit:
self.set_autocommit(conn, autocommit)
with closing(conn.cursor()) as cur:
for s in sql:
if parameters is not None:
self.log.info("{} with parameters {}".format(s, parameters))
cur.execute(s, parameters)
else:
self.log.info(s)
cur.execute(s)
# If autocommit was set to False for db that supports autocommit,
# or if db does not supports autocommit, we do a manual commit.
if not self.get_autocommit(conn):
conn.commit()