Python: multiprocessing.map завершается с ошибкой queue.FULL - PullRequest
0 голосов
/ 24 апреля 2018

Я использую функцию map() из

 from concurrent.futures import ProcessPoolExecutor

для простого распараллеливания данных.

Я хочу обработать 400 файлов, используя map(), чтобы вызвать для них функцию обработки.

  infiles = glob.glob(os.path.join(input_path, '**/*.xls'), recursive=True) + glob.glob(os.path.join(input_path, '**/*.xlsx'), recursive=True) 
  outfiles = [os.path.join(os.path.dirname(infile), os.path.basename(infile).split('.')[0]+'.csv') for infile in infiles]

  with ProcessPoolExecutor(max_workers=None) as executor:
      executor.map(excel2csv, infiles, outfiles)

, поэтому excel2csv() должен вызываться для каждого файла, передавая желаемый путь ввода и вывода. Он обрабатывает каждый файл независимо, записывает результаты на диск и ничего не возвращает.

После примерно 100 файлов приложение выдает исключение, жалуясь на полную очередь.

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/concurrent/futures/process.py", line 295, in _queue_management_worker
    shutdown_worker()
  File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/concurrent/futures/process.py", line 253, in shutdown_worker
    call_queue.put_nowait(None)
  File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full

Наиболее похожая проблема, которую я обнаружил, обсуждается здесь .

Но в моем случае данные, передаваемые в функцию worker , минимальны (содержат две строки). Проверка размера очереди по умолчанию (из _multiprocessing.SemLock.SEM_VALUE_MAX), стоимость которого превышает 400.

Есть идеи? Спасибо

1 Ответ

0 голосов
/ 25 апреля 2018

Я обнаружил, что ошибка вызвана исключениями, созданными в рабочей функции, вызываемой executor.map ().

Кажется, что исключения потребляются? от executor.map (), и я предполагаю, что это каким-то образом заполнило очередь.

Мое решение состоит в том, чтобы решить проблему в excel2csv () и включить обобщенную обработку исключений типа try catch, которая не вызовет очередьзаполнить.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...