Вопрос
Можно ли зафиксировать транзакцию SQL в БД с помощью read_sql ()?
Вариант использования и фон
У меня есть сценарий использования, в котором я хочу разрешить пользователям выполнять некоторый предопределенный SQL и возвращать кадр данных pandas.В некоторых случаях этот SQL должен будет выполнить запрос к предварительно заполненной таблице, а в других случаях этот SQL будет выполнять функцию, которая будет писать в таблицу, а затем эта таблица будет запрошена.Эта логика в настоящее время содержится внутри метода в DAG Airflow, чтобы использовать информацию о соединении с базой данных, доступную для Airflow с помощью PostgresHook - метод в конечном итоге вызывается в задаче PythonOperator.В ходе тестирования я понял, что PostgresHook создает объект подключения psycopg2.
Код
from airflow.hooks.postgres_hook import PostgresHook
import pandas as pd
def create_df(job_id,other_unrelated_inputs):
conn = job_type_to_connection(job_type) # method that helps choose a database
sql = open('/sql_files/job_id_{}.sql'.format(job_id)) #chooses arbitrary SQL
sql_template = sql.read()
hook = PostgresHook(postgres_conn_id=conn) #connection information for alias is predefined elsewhere within Airflow
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
# Runs SQL template with variables, but does not commit. Alternatively, have used hook.get_pandas_df(sql_template)
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj)
except:
#catches some errors#
return df
Проблема
В настоящее времяпри выполнении функции SQL этот код генерирует кадр данных, но не фиксирует какие-либо изменения БД, сделанные в функции SQL.Например, если быть более точным, если функция SQL вставляет строку в таблицу, эта транзакция не будет зафиксирована и строка не появится в таблице.
Попытки
Я предпринял несколько исправлений, но застрял.Моя последняя попытка состояла в том, чтобы изменить атрибут autocommit соединения psycopg2, который read_sql использует для автоматического подтверждения транзакции.
Я признаю, что не смог выяснить, когда атрибуты соединения имеютвлияние на выполнение SQL.
Я понимаю, что альтернативным путем является репликация некоторой логики в PostgresHook.run () для фиксации, а затем добавление некоторого кода для отправки результатов в фрейм данных, но это кажется более экономными проще для будущей поддержки использовать уже созданные методы, если это возможно.
Самый аналогичный вопрос SO, который я мог найти, был этот , но я заинтересован в решении, не зависящем от Airflow.
РЕДАКТИРОВАТЬ
...
try:
hook_conn_obj = hook.get_conn()
print(type(hook_conn_obj)) # <class 'psycopg2.extensions.connection'>
hook_conn_obj.autocommit = True
df = pd.io.sql.read_sql(sql_template, con = hook_conn_obj) # Runs SQL template with variables, but does not commit
except:
#catches some errors#
return df
Кажется, это работает.Если у кого-то есть какие-либо комментарии или мысли о лучшем способе достижения этого, мне все равно интересно узнать из обсуждения.
Спасибо!