Отмена многопроцессорной обработки. Пул с SIGINT - PullRequest
0 голосов
/ 08 мая 2018

Моя цель - вычислить несколько точек данных параллельно и асинхронно записать их на устройство ввода-вывода. Точки данных не могут быть записаны сами по себе, но должны быть обернуты; Точнее, я пишу массив JSON, и символы [] вокруг структур требуются.

Мой текущий подход заключается в использовании multiprocessing.Pool и apply_async для вычисления точек данных. Используя функцию обратного вызова, точки данных затем отправляются на multiprocessing.Queue, в то время как отдельный поток синхронно извлекает элементы из очереди и записывает их на устройство ввода-вывода.

Может случиться так, что мне нужно отменить процесс с помощью SIGINT. В этом случае я хочу, чтобы вычисление завершилось благополучно, то есть остановило вычисление всех процессов Pool, но завершило запись всех оставшихся элементов в очереди и символа ].

Пока мне не удалось найти рабочее решение моей проблемы. В текущем состоянии мои две проблемы:

  • Иногда процесс не завершается. SIGINT_handler вызывается, но процесс не завершается. Я не могу проверить, откуда это происходит, но я предполагаю, что это может быть тупик в очереди? Я не знаю, как это предотвратить.
  • Насколько я понимаю, pool.terminate() отправляет SIGTERM всем дочерним процессам. Это, очевидно, вызывает исключение KeyboardInterrupt в каждом из них, загромождая терминал дюжиной следов стека.

Мой код можно найти ниже.

# Initialize the worker pool and necessary variables.
pool = multiprocessing.Pool(os.cpu_count() - 1)
data_queue = multiprocessing.Queue()
counter_lock = threading.Lock()
threads_todo = args.autnum

# This function is executed after each successful experiment.
def apply_finished(data):
    data_queue.put(data)
    with counter_lock:
        nonlocal threads_todo
        threads_todo -= 1

# Start the pool.
for i in range(args.autnum):
    pool.apply_async(collect_data, (args,), callback=apply_finished)
pool.close()

# This function is called if SIGINT is send to this process.
def SIGINT_handler(sig, frame):
    sys.stderr.write("SIGINT received. Cancelling...")
    sys.stderr.flush()
    pool.terminate()
    with counter_lock:
        nonlocal threads_todo
        threads_todo = 0
signal.signal(signal.SIGINT, SIGINT_handler)

# Write the data to stdout until all workers terminate or a SIGINT is received.
sys.stdout.write("[\n")
while threads_todo > 0 or not data_queue.empty():
    try:
        data = data_queue.get(True, 1)
        s = data.decode('utf-8')
        sys.stdout.write(s)
        sys.stdout.flush()
    except queue.Empty:
        data = None
sys.stdout.write("]")
...