Как я могу показать вывод DML, выполненных в Airflow через PostgresOperator? - PullRequest
0 голосов
/ 29 мая 2019

Мы используем Apache Airflow для запуска DML в таких базах данных, как Postgres и Redshift.

Прямо сейчас мы используем хук Postgres , который реализует psycopg2.

Когда я запускаю команды COPY или DML, такие как Delete, Insert, Update, я не получаю затронутое количество записей в журналах Airflow. Я получаю их как вывод, когда выполняю их с помощью инструмента командной строки psql.

Ниже приведен код, который выполняет SQL в моей базе кода:

def execute(self, context):
    hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.schema)

    if self.pg_preoperator:
        logging.info("Setting up Postgres operator.")
        hook.run(self.pg_preoperator)

    logging.info('Executing: ' + str(self.sql))
    logging.info('Parameters: ' + str(self.parameters))
    hook.run(self.sql, self.autocommit, parameters=self.parameters)

    if self.pg_postoperator:
        logging.info("Finished Postgres query.")
        hook.run(self.pg_postoperator)

Как мне изменить код, чтобы получать выходные записи, затронутые в моих журналах Airflow? В коде хука Postgres я не нашел ни одной переменной, которая, по-моему, фиксирует вывод cursor.execute.

Кроме того, ниже приведена команда запуска, которая в конечном итоге называется:

def run(self, sql, autocommit=False, parameters=None):
    """
    Runs a command or a list of commands. Pass a list of sql
    statements to the sql parameter to get them to execute
    sequentially
    :param sql: the sql statement to be executed (str) or a list of
        sql statements to execute
    :type sql: str or list
    :param autocommit: What to set the connection's autocommit setting to
        before executing the query.
    :type autocommit: bool
    :param parameters: The parameters to render the SQL query with.
    :type parameters: mapping or iterable
    """
    if isinstance(sql, str):
        sql = [sql]

    with closing(self.get_conn()) as conn:
        if self.supports_autocommit:
            self.set_autocommit(conn, autocommit)

        with closing(conn.cursor()) as cur:
            for s in sql:
                if parameters is not None:
                    self.log.info("{} with parameters {}".format(s, parameters))
                    cur.execute(s, parameters)
                else:
                    self.log.info(s)
                    cur.execute(s)

        # If autocommit was set to False for db that supports autocommit,
        # or if db does not supports autocommit, we do a manual commit.
        if not self.get_autocommit(conn):
            conn.commit()
...