Убить незаконченный SQL совершенный запрос с помощью psycopg2, когда InterruptKey - PullRequest
1 голос
/ 13 января 2020

Я использую сценарий Python, который запускает различные SQL запросы на автоматическое принятие по базе данных AWS Redshift с использованием библиотеки psycopg2. Сценарий выполняется вручную с моей локальной рабочей станции. Процесс выглядит следующим образом:

  • Создание соединения с базой данных с помощью psycopg2.connect ()
  • Выполнение автоматических подтверждений запросов к базе данных с помощью execute ()
  • Закрытие соединения .

По разным причинам база данных может быть недоступна (проблема с сетью, многие запросы уже запущены ...), и лучше остановить сценарий Python. На этом этапе я затем уничтожаю уже зафиксированные (и незавершенные) запросы через SQL клиент (SQL верстак), получая pid, связанный с этими запросами. Я хотел бы автоматизировать последний шаг непосредственно в сценарии Python, когда пользователь останавливает его (ctrl + c). Поток будет выглядеть следующим образом:

  • Создать соединение с базой данных с помощью psycopg2.connect ()
  • Выполнить автоматически принятые запросы к базе данных с помощью execute ()
  • Сохранить текущий PID, связанный с запросом с помощью info.backend_pid Атрибут соединения
  • Если получено исключение InterruptKey, завершить текущий запрос, используя ранее сохраненный PID
  • Закрыть соединение.

Я провел некоторый тест на ноутбуке, чтобы проверить, могу ли я получить информацию о back_pid:

log = logging.getLogger(__name__)
session = psycopg2.connect(
    connection_factory=LoggingConnection,
        host=host,
        port=port,
        dbname=database,
        user=user,
        password=password,
        sslmode="require",
)
session.initialize(log)
session.set_session(autocommit=True)

query = """
CREATE OR REPLACE FUNCTION janky_sleep (x float) RETURNS bool IMMUTABLE as $$
    from time import sleep
    sleep(x)
    return True
$$ LANGUAGE plpythonu;
"""

cur = session.cursor()
cur.execute(query)
cur.execute("select janky_sleep(60.0)")

Я использовал функцию сна для репликации поведения запроса, который займет 60 секунд до конца sh. При получении backend_pid следующим образом:

session.info.backend_pid

Проблема заключается в том, что объект сеанса уже используется методом execute () (выполняется запрос), а информация backend_pid появляется только тогда, когда Сессия свободна, т.е. когда запрос завершен.

Я думал о том, чтобы запустить параллельный Python процесс, который будет отслеживать родительский процесс. После остановки родительского процесса дочерний процесс получает backend_pid через второе соединение с базой данных, а затем запускает запрос на удаление. Однако этот подход кажется излишним.

Как правильно решить эту ситуацию?

Спасибо

1 Ответ

0 голосов
/ 15 января 2020

Я наконец-то использовал ресурс, найденный в документации: http://initd.org/psycopg/docs/faq.html#faq -interrupt-query . Это позволяет Psycopg2 получать сигнал SIGINT и убивать последующие запросы.

>>> psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
>>> cnn = psycopg2.connect('')
>>> cur = cnn.cursor()
>>> cur.execute("select pg_sleep(10)")
^C
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  QueryCanceledError: canceling statement due to user request

>>> cnn.rollback()
>>> # You can use the connection and cursor again from here
...