Тайм-аут запроса MySQLDB Python - PullRequest
       38

Тайм-аут запроса MySQLDB Python

9 голосов
/ 02 октября 2009

Я пытаюсь установить ограничение по времени для запросов в Python MySQLDB. У меня есть ситуация, когда я не контролирую запросы, но должен убедиться, что они не выполняются в течение установленного срока. Я пытался использовать signal.SIGALRM, чтобы прервать вызов, чтобы выполнить, но это не похоже на работу. Сигнал отправляется, но не перехватывается до завершения вызова для выполнения.

Я написал тестовый пример, чтобы доказать это поведение:

#!/usr/local/bin/python2.6

import time
import signal

from somewhere import get_dbc

class Timeout(Exception):
    """ Time Exceded """

def _alarm_handler(*args):
    raise Timeout

dbc = get_dbc()

signal.signal(signal.SIGALRM, _alarm_handler)
signal.alarm(1)

try:
    print "START:  ", time.time()
    dbc.execute("SELECT SLEEP(10)")
except Timeout:
    print "TIMEOUT!", time.time()'

"SELECT SLEEP (10)" имитирует медленный запрос, но я вижу то же поведение с реальным медленным запросом.

Результат:

START:   1254440686.69
TIMEOUT! 1254440696.69

Как видите, он спит в течение 10 секунд, после чего я получаю исключение тайм-аута.

Вопросы:

  1. Почему я не получаю сигнал до окончания выполнения?
  2. Есть ли другой надежный способ ограничить время выполнения запроса?

Ответы [ 6 ]

8 голосов
/ 02 октября 2009

@ nosklo решение на основе витой элегантно и выполнимо, но если вы хотите избежать зависимости от витой, задача все еще выполнима, например:

import multiprocessing

def query_with_timeout(dbc, timeout, query, *a, **k):
  conn1, conn2 = multiprocessing.Pipe(False)
  subproc = multiprocessing.Process(target=do_query,
                                    args=(dbc, query, conn2)+a, 
                                    kwargs=k)
  subproc.start()
  subproc.join(timeout)
  if conn1.poll():
    return conn1.recv()
  subproc.terminate()
  raise TimeoutError("Query %r ran for >%r" % (query, timeout))

def do_query(dbc, query, conn, *a, **k):
  cu = dbc.cursor()
  cu.execute(query, *a, **k)
  return cu.fetchall()
2 голосов
/ 22 октября 2009

Я пытался использовать signal.SIGALRM, чтобы прервать выполнение вызова, но, похоже, это не работает. Сигнал отправляется, но не перехватывается до тех пор, пока не завершится вызов для выполнения.

Библиотека mysql обрабатывает прерванные системные вызовы внутренне, поэтому вы не увидите побочных эффектов SIGALRM до тех пор, пока не завершится вызов API (если не считать уничтожение текущего потока или процесса)

Вы можете попробовать установить исправления MySQL-Python и использовать опцию MYSQL_OPT_READ_TIMEOUT (добавлено в mysql 5.0.25)

1 голос
/ 06 августа 2013

Общие примечания

В последнее время я сталкивался с той же проблемой при выполнении нескольких условий:

  • раствор должен быть поточно-безопасным
  • несколько соединений с базой данных с одной и той же машины могут быть активны одновременно, уничтожить одно соединение / запрос
  • содержит подключения ко многим различным базам данных - переносимый обработчик для каждого хоста БД

У нас было следующее расположение классов ( к сожалению, я не могу публиковать реальные источники ):

class AbstractModel: pass 
class FirstDatabaseModel(AbstractModel): pass # Connection to one DB host
class SecondDatabaseModel(AbstractModel): pass # Connection to one DB host

И создал несколько потоков для каждой модели.

<Ч />

Решение Python 3.2

В нашем приложении одна модель = одна база данных . Поэтому я создал « сервисное соединение » для каждой модели (чтобы мы могли выполнить KILL в параллельном соединении). Поэтому, если был создан один экземпляр FirstDatabaseModel, было создано 2 подключения к базе данных; если было создано 5 экземпляров, использовалось только 6 соединений:

class AbstractModel:
    _service_connection = None # Formal declaration

    def __init__(self):
        ''' Somehow load config and create connection
        '''
        self.config = # ...
        self.connection = MySQLFromConfig(self.config)
        self._init_service_connection()

        # Get connection ID (pseudocode)
        self.connection_id = self.connection.FetchOneCol('SELECT CONNECTION_ID()') 

    def _init_service_connection(self):
        ''' Initialize one singleton connection for model
        '''
        cls = type(self)
        if cls._service_connection is not None:
            return

        cls._service_connection = MySQLFromConfig(self.config)

Теперь нам нужен убийца:

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)

    # Do your own connection check and renewal
    type(self)._service_connection.execute(sql)

Примечание: connection.execute = создать курсор, выполнить, закрыть курсор.

И сделать защитную нить безопасной, используя threading.Lock:

def _init_service_connection(self):
    ''' Initialize one singleton connection for model
    '''
    cls = type(self)
    if cls._service_connection is not None:
        return

    cls._service_connection = MySQLFromConfig(self.config)
    cls._service_connection_lock = threading.Lock()

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)
    cls = type(self)

    # Do your own connection check and renewal
    try:
        cls._service_connection_lock.acquire()    
        cls._service_connection.execute(sql)
    finally:
        cls._service_connection_lock.release()

И, наконец, добавьте метод выполнения по времени, используя threading.Timer:

def timed_query(self, sql, timeout=5):
    kill_query_timer = threading.Timer(timeout, self._kill_connection)
    kill_query_timer.start()

    try:
        self.connection.long_query() 
    finally:
        kill_query_timer.cancel()
1 голос
/ 02 октября 2009

Используйте adbapi . Это позволяет выполнять асинхронный вызов БД.

from twisted.internet import reactor
from twisted.enterprise import adbapi

def bogusQuery():
    return dbpool.runQuery("SELECT SLEEP(10)")

def printResult(l):
    # function that would be called if it didn't time out
    for item in l:
        print item

def handle_timeout():
    # function that will be called when it timeout
    reactor.stop()

dbpool = adbapi.ConnectionPool("MySQLdb", user="me", password="myself", host="localhost", database="async")
bogusQuery().addCallback(printResult)
reactor.callLater(4, handle_timeout)
reactor.run()
1 голос
/ 02 октября 2009

Почему я не получаю сигнал до окончания выполнения?

Запрос выполняется через функцию C, которая блокирует выполнение Python VM до тех пор, пока не вернется.

Есть ли другой надежный способ ограничить время выполнения запроса?

Это (IMO) действительно ужасное решение, но оно работает . Вы можете выполнить запрос в отдельном процессе (либо через fork(), либо через модуль multiprocessing ). Запустите таймер будильника в вашем основном процессе, и, когда вы его получите, отправьте SIGINT или SIGKILL дочернему процессу. Если вы используете multiprocessing, вы можете использовать метод Process.terminate().

0 голосов
/ 02 октября 2009

Почему я не получаю сигнал до окончания выполнения?

Процесс, ожидающий сетевого ввода-вывода, находится в непрерывном состоянии (вещь UNIX, не связанная с Python или MySQL). Он получает сигнал после завершения системного вызова (вероятно, в виде EINTR кода ошибки, хотя я не уверен).

Есть ли другой надежный способ ограничить время выполнения запроса?

Я думаю, что обычно это делается внешним инструментом, таким как mkill, который отслеживает MySQL на предмет длительных запросов и убивает их.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...