Завершение блока try в python через n секунд - PullRequest
1 голос
/ 13 июля 2020

Я пытаюсь наложить исключение TimeoutException на оператор try через n секунд. Я нашел библиотеку, которая обрабатывает этот вызванный сигнал, что было бы идеально, но я столкнулся с ошибкой, которую мне трудно обойти. ( На этот аналогичный вопрос SO отвечает библиотека сигналов.)

Это сводный код, представляющий проблему:

import multiprocessing
from multiprocessing.dummy import Pool

def main():
    listOfLinks = []
    threadpool = Pool(2)
    info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
    threadpool.close()

def processRunSeveralTimesInParallel(listOfLinks):
    #The following is pseudo code representing what I would like to do:
    loongSequenceOfInstructions()
    for i in range(0,10):
        try for n seconds:
            doSomething(i)
        except (after n seconds):
            handleException()

    return something

При реализации Решение вышеупомянутого вопроса с библиотекой сигналов, я получаю следующую ошибку:

File "file.py", line 388, in main
    info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "file.py", line 193, in processRunSeveralTimesInParallel
    signal.signal(signal.SIGALRM, signal_handler)
  File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread

Есть идеи, как ограничить время только на блоке try в методе, запущенном как поток? Спасибо!

Важная информация:

  • Я использую многопроцессорную библиотеку для параллельного запуска нескольких процессов одновременно. Из приведенного выше сообщения об ошибке я подозреваю, что конфликтуют библиотеки сигналов и многопроцессорности.
  • В операторе try используются методы selenium (find_element_by_xpath) . Однако нет доступных аргументов тайм-аута.

1 Ответ

2 голосов
/ 13 июля 2020

Недавно обновленный ответ

Если вы ищете способ поиска тайм-аута без использования сигналов, вот один способ. Во-первых, поскольку вы используете многопоточность, давайте сделаем это явным и воспользуемся модулем concurrent.futures, который обладает большой гибкостью.

Когда «задание» отправляется исполнителю пула, a Future instance возвращается немедленно без блокировки до тех пор, пока для этого экземпляра не будет выполнен вызов result. Вы можете указать значение timeout, чтобы, если результат недоступен в течение периода ожидания, будет выдано исключение. Идея состоит в том, чтобы передать рабочему потоку экземпляр ThreadPoolExecutor и запустить критический фрагмент кода, который должен быть завершен в течение определенного периода времени в его собственном рабочем потоке. Экземпляр Future будет создан для этого временного кода, но на этот раз вызов result будет указывать значение timeout:

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e']
    futures = []
    """
    To prevent timeout errors due to lack of threads, you need at least one extra thread
    in addition to the ones being created here so that at least one time_critical thread
    can start. Of course, ideally you would like all the time_critical threads to be able to
    start without waiting. So, whereas the minimum number of max_workers would be 6 in this
    case, the ideal number would be 5 * 2 = 10.
    """
    with ThreadPoolExecutor(max_workers=10) as executor:
        # pass executor to our worker
        futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple, executor):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        future = executor.submit(time_critical, link, i)
        try:
            future.result(timeout=2) # time_critical does not return a result other than None
        except TimeoutError:
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

Выводит:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee

Использование потоков и многопроцессорность

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    futures = []
    cpu_count = os.cpu_count()
    with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
        # pass executor to our worker
        futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple, executor):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        future = executor.submit(time_critical, link, i)
        try:
            future.result(timeout=2) # time_critical does not return a result other than None
        except TimeoutError:
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

Печать:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj

Исключительно многопроцессорная обработка

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time


def main():
    listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
    futures = []
    workers = os.cpu_count() // 2
    with ProcessPoolExecutor(max_workers=workers) as process_executor:
        # pass executor to our worker
        futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
        for future in futures:
            result = future.result()
            print('result is', result)


def processRunSeveralTimesInParallel(tuple):
    link_number = tuple[0]
    link = tuple[1]
    # long running sequence of instructions up until this point and then
    # allow 2 seconds for this part:
    for i in range(10):
        p = Process(target=time_critical, args=(link, i))
        p.start()
        p.join(timeout=2) # don't block for more than 2 seconds
        if p.exitcode is None: # subprocess did not terminate
            p.terminate() # we will terminate it
            handle_exception(link, i)
    return link * link_number


def time_critical(link, trial_number):
    if link == 'd' and trial_number == 7:
        time.sleep(3) # generate a TimeoutError


def handle_exception(link, trial_number):
    print(f'There was a timeout for link {link}, trial number {trial_number}.')


if __name__ == '__main__':
    main()

Печать:

result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
...