Воздушный поток + панды read_sql_query () с коммитом - PullRequest
0 голосов
/ 13 декабря 2018

Вопрос

Можно ли зафиксировать транзакцию SQL в БД с помощью read_sql ()?

Вариант использования и фон

У меня есть сценарий использования, в котором я хочу разрешить пользователям выполнять некоторый предопределенный SQL и возвращать кадр данных pandas.В некоторых случаях этот SQL должен будет выполнить запрос к предварительно заполненной таблице, а в других случаях этот SQL будет выполнять функцию, которая будет писать в таблицу, а затем эта таблица будет запрошена.Эта логика в настоящее время содержится внутри метода в DAG Airflow, чтобы использовать информацию о соединении с базой данных, доступную для Airflow с помощью PostgresHook - метод в конечном итоге вызывается в задаче PythonOperator.В ходе тестирования я понял, что PostgresHook создает объект подключения psycopg2.

Код

from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd 

def create_df(job_id,other_unrelated_inputs):
    conn = job_type_to_connection(job_type) # method that helps choose a database
    sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL  
    sql_template = sql.read() 
    hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow


    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
        # Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
        df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) 
    except:
        #catches some errors#
    return df

Проблема

В настоящее времяпри выполнении функции SQL этот код генерирует кадр данных, но не фиксирует какие-либо изменения БД, сделанные в функции SQL.Например, если быть более точным, если функция SQL вставляет строку в таблицу, эта транзакция не будет зафиксирована и строка не появится в таблице.

Попытки

Я предпринял несколько исправлений, но застрял.Моя последняя попытка состояла в том, чтобы изменить атрибут autocommit соединения psycopg2, который read_sql использует для автоматического подтверждения транзакции.

Я признаю, что не смог выяснить, когда атрибуты соединения имеютвлияние на выполнение SQL.

Я понимаю, что альтернативным путем является репликация некоторой логики в PostgresHook.run () для фиксации, а затем добавление некоторого кода для отправки результатов в фрейм данных, но это кажется более экономными проще для будущей поддержки использовать уже созданные методы, если это возможно.

Самый аналогичный вопрос SO, который я мог найти, был этот , но я заинтересован в решении, не зависящем от Airflow.

РЕДАКТИРОВАТЬ

...
    try:
        hook_conn_obj = hook.get_conn()
        print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
        hook_conn_obj.autocommit = True
        df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
    except:
        #catches some errors#
    return df

Кажется, это работает.Если у кого-то есть какие-либо комментарии или мысли о лучшем способе достижения этого, мне все равно интересно узнать из обсуждения.

Спасибо!

1 Ответ

0 голосов
/ 13 декабря 2018

read_sql не будет фиксироваться, поскольку, как следует из названия этого метода, цель состоит в том, чтобы читать данные, а не записывать.Это хороший выбор дизайна от pandas.Это важно, потому что это предотвращает случайные записи и позволяет интересные сценарии, такие как запуск процедуры, чтение ее эффектов, но ничего не сохраняется.read_sql намеревается читать, а не писать.Прямое выражение намерения является принципом золотого стандарта.

Более явным способом выразить свое намерение было бы явное execute (с фиксацией) перед fetchall.Но поскольку pandas не предлагает простого способа чтения с объекта cursor, вы потеряете спокойствие, обеспечиваемое read_sql, и вам придется создавать DataFrame самостоятельно.

Так что в целом ваше решениевсе в порядке, установив autocommit=True, вы указываете, что ваши взаимодействия с базой данных будут продолжаться независимо от того, что они делают, поэтому не должно быть аварий.Это немного странно читать, но если вы назвали свою переменную sql_template как-то вроде write_then_read_sql или объясните в строке документации, намерение будет более ясным.

...