Многопроцессорная защита от случайных сбоев - PullRequest
0 голосов
/ 12 октября 2018

У меня есть 100-1000 временных рядов и довольно дорогая симуляция, которую я хотел бы распараллелить.Тем не менее, библиотека, которую я использую, зависает в редких случаях, и я хотел бы сделать ее устойчивой к этим проблемам.Это текущая настройка:

with Pool() as pool:
    res = pool.map_async(simulation_that_occasionally_hangs, (p for p in paths))
    all_costs = res.get()

Я знаю, get() имеет параметр timeout, но если я правильно понимаю, это работает на весь процесс из 1000 путей.Я хотел бы проверить, занимает ли какое-либо одиночное моделирование больше 5 минут (обычный путь занимает 4 секунды), и если это так, просто остановите этот путь и продолжайте до get() остальных.

РЕДАКТИРОВАТЬ:

Тайм-аут тестирования в pebble

def fibonacci(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fibonacci(n - 1) + fibonacci(n - 2)


def main():
    with ProcessPool() as pool:
        future = pool.map(fibonacci, range(40), timeout=10)
        iterator = future.result()

        all = []
        while True:
            try:
                all.append(next(iterator))
            except StopIteration:
                break
            except TimeoutError as e:
                print(f'function took longer than {e.args[1]} seconds')

        print(all)

Ошибки:

RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
    _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied

Ответы [ 2 ]

0 голосов
/ 13 октября 2018

Библиотека pebble была разработана для решения подобных проблем.Он прозрачно обрабатывает тайм-ауты заданий и сбои, такие как сбои библиотеки C.

Вы можете проверить документацию примеров, чтобы увидеть, как ее использовать.Он имеет интерфейс, аналогичный concurrent.futures.

0 голосов
/ 12 октября 2018

Вероятно, самый простой способ - запустить каждое тяжелое моделирование в отдельном подпроцессе, а родительский процесс будет наблюдать за ним.В частности:

def risky_simulation(path):
    ...

def safe_simulation(path):
    p = multiprocessing.Process(target=risky_simulation, args=(path,))
    p.start()
    p.join(timeout)  # Your timeout here
    p.kill()  # or p.terminate()
    # Here read and return the output of the simulation
    # Can be from a file, or using some communication object
    # between processes, from the `multiprocessing` module

with Pool() as pool:
    res = pool.map_async(safe_simulation, paths)
    all_costs = res.get()

Примечания:

  1. Если моделирование может зависнуть, вы можете запустить его в отдельном процессе (т. Е. Объект Process не должен быть потоком), в зависимости от того, как это делается, он может перехватить GIL.
  2. Это решение использует пул только для непосредственных подпроцессов, но вычисления выгружаются в новые процессы.Мы также можем убедиться, что вычисления разделяют пул, но это приведет к более уродливому коду, поэтому я его пропустил.
...