Многопроцессорная обработка Python - уменьшается ли количество процессов в пуле при ошибке? - PullRequest
2 голосов
/ 06 марта 2019

код:

import multiprocessing
print(f'num cpus {multiprocessing.cpu_count():d}')
import sys; print(f'Python {sys.version} on {sys.platform}')

def _process(m):
    print(m) #; return m
    raise ValueError(m)

args_list = [[i] for i in range(1, 20)]

if __name__ == '__main__':
    with multiprocessing.Pool(2) as p:
        print([r for r in p.starmap(_process, args_list)])

печать:

num cpus 8
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28) 
[Clang 6.0 (clang-600.0.57)] on darwin
1
7
4
10
13
16
19
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 8, in _process
    raise ValueError(m)
ValueError: 1
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 18, in <module>
    print([r for r in p.starmap(_process, args_list)])
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 298, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
ValueError: 1

Process finished with exit code 1

При увеличении числа процессов в пуле до 3 или 4 выводятся все нечетные числа (возможно, не по порядку):

1
3
5
9
11
7
13
15
17
19

в то время как от 5 и выше он печатает весь диапазон 1-19. Так что здесь происходит? Сбои процессов после ряда сбоев?

Это, конечно, игрушечный пример, но он связан с реальной проблемой, с которой я столкнулся - из-за того, что многопроцессорный пул оставался на несколько дней стабильно, загрузка процессора снижалась, как будто некоторые процессы были убиты (обратите внимание, что загрузка процессора идет вниз 03/04 и 03/06, когда еще предстояло выполнить много заданий):

cpu utilization

Когда код завершился, он подарил мне одну (и только одну, как здесь, в то время как процессов было намного больше) multiprocessing.pool.RemoteTraceback - бонусный вопрос - это случайная трассировка? В этом игрушечном примере это обычно ValueError: 1, но иногда появляются и другие числа. Сохраняет ли многопроцессорность первую трассировку от первого сбойного процесса?

Ответы [ 2 ]

2 голосов
/ 07 марта 2019

Нет, просто взрывается целая задача, а не сам процесс.Наблюдаемое вами поведение в вашем примере с игрушкой объяснимо получающимися размерами фрагментов для комбинации количества рабочих и длины повторяемого элемента.Когда вы берете функцию calc_chunksize_info из здесь , вы можете увидеть разницу в результирующих размерах фрагментов:

calc_chunksize_info(n_workers=2, len_iterable=20)
# Chunkinfo(n_workers=2, len_iterable=20, n_chunks=7, chunksize=3, last_chunk=2)

calc_chunksize_info(n_workers=5, len_iterable=20)
# Chunkinfo(n_workers=5, len_iterable=20, n_chunks=20, chunksize=1, last_chunk=1) 

В случае, если размер фрагмента будет> 1, все нетронутыми "taskels "(1. Определения: Taskel) внутри задачи также теряются, так как вскоре первое taskel вызывает исключение.Обработайте ожидаемые исключения непосредственно в вашей целевой функции или напишите дополнительную оболочку для обработки ошибок, чтобы предотвратить это.

Когда код завершился, он предоставил мне одну (и одну только как здесь, в то время как процессыбыло еще много) multiprocessing.pool.RemoteTraceback - бонусный вопрос - это случайная трассировка?В этом игрушечном примере это обычно ValueError: 1, но иногда появляются и другие числа.Сохраняет ли многопроцессорность первую трассировку от первого сбойного процесса?

Рабочие процессы получают задачи из общей очереди.Чтение из очереди является последовательным, поэтому задание 1 всегда будет считываться перед заданием 2. Однако невозможно предсказать, в каком порядке результаты будут готовы для рабочих.В игру вовлечено много аппаратных и зависящих от ОС факторов, поэтому да, трассировка является случайной, так как порядок результатов является случайным, так как (строковая) трассировка является частью результата, возвращаемого родителю.Результаты также отправляются обратно через общую очередь, а Pool внутренне обрабатывает возвращаемые задачи JIT.В случае неудачного завершения задания все задание помечается как неуспешное, а последующие поступающие задачи отбрасываются.Только первое полученное исключение повторно вызывается в родительском объекте, как только все задачи в задании возвращаются.

2 голосов
/ 06 марта 2019

Быстрый эксперимент с watch ps aux в одном окне и вашим кодом в другом, кажется, говорит, что нет, исключения не приводят к аварийному завершению дочерних процессов.

Объект MapResult, который лежит в основе операций map / starmap, собирает только первое исключение и рассматривает все задание карты как сбой, если какое-либо задание не выполняется с исключением.

(Сколько заданий отправляется каждому работнику для работы, регулируется параметром chunksize для .map() и друзьями.)

Если вы хотите что-то более устойчивое к исключениям, вы можете просто использовать .apply_async():

import multiprocessing
import os

def _process(m):
    if m % 2 == 0:
        raise ValueError('I only work on odd numbers')
    return m * 8


if __name__ == '__main__':
    args_list = list(range(1, 20))
    with multiprocessing.Pool(2) as p:
        params_and_jobs = [((arg,), p.apply_async(_process, (arg,))) for arg in args_list]
        for params, job in params_and_jobs:
            job.wait()
            # regularly you'd use `job.get()`, but it would `raise` the exception,
            # which is not suitable for this example, so we dig in deeper and just use
            # the `._value` it'd return or raise:
            print(params, type(job._value), job._value)

выходы

(1,) <class 'int'> 8
(2,) <class 'ValueError'> I only work on odd numbers
(3,) <class 'int'> 24
(4,) <class 'ValueError'> I only work on odd numbers
(5,) <class 'int'> 40
(6,) <class 'ValueError'> I only work on odd numbers
(7,) <class 'int'> 56
(8,) <class 'ValueError'> I only work on odd numbers
(9,) <class 'int'> 72
(10,) <class 'ValueError'> I only work on odd numbers
(11,) <class 'int'> 88
(12,) <class 'ValueError'> I only work on odd numbers
(13,) <class 'int'> 104
(14,) <class 'ValueError'> I only work on odd numbers
(15,) <class 'int'> 120
(16,) <class 'ValueError'> I only work on odd numbers
(17,) <class 'int'> 136
(18,) <class 'ValueError'> I only work on odd numbers
(19,) <class 'int'> 152
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...