При импорте командных запросов вместе с concurrent.futures.ProcessPoolExecutor создает исключение в Python 3 - PullRequest
0 голосов
/ 12 февраля 2020

Я новичок в многопроцессорной обработке и пытаюсь выполнить вычисление для словаря, используя concurrent.futures.ProcessPoolExecutor (). Вот самое простое воспроизведение моего кода, который работает как задумано:

#import grequests
import concurrent.futures

def doSomething(n):
    return n ** 2

myList = [0, 1, 2, 3]
myDict = {0:0,1:1,2:2,3:3}

anotherDict = {}
futures = {}

with concurrent.futures.ProcessPoolExecutor() as pool:
    for id in myList:
        returnObj = pool.submit(doSomething, myDict[id])
        futures[id] = returnObj

for id in futures.keys():
    anotherDict[id] = futures[id].result()

print(anotherDict)

Вывод правильный: {0: 0, 1: 1, 2: 4, 3: 9}

Однако мне нужно использовать греквесты в других местах программы (я смог чтобы определить это виновник). Импортирование командных запросов в приведенном выше коде и повторный запуск приводит к следующей трассировке:

Exception in thread QueueManagerThread:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 440, in _queue_management_worker
    shutdown_worker()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 344, in shutdown_worker
    p.join()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 140, in join
    res = self._popen.wait(timeout)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 48, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File "/Users/joecat/venvjc/lib/python3.7/site-packages/gevent/os.py", line 368, in waitpid
    get_hub().wait(new_watcher)
  File "src/gevent/_hub_primitives.py", line 46, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 55, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_waiter.py", line 151, in gevent.__waiter.Waiter.get
  File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 64, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/__greenlet_primitives.pxd", line 35, in gevent.__greenlet_primitives._greenlet_switch
gevent.exceptions.LoopExit: This operation would block forever
    Hub: <Hub '' at 0x103b57590 select pending=0 ref=0 thread_ident=0x700007c42000>
    Handles:
[]

{0: 0, 1: 1, 2: 4, 3: 9}
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 102, in _python_exit
    thread_wakeup.wakeup()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 90, in wakeup
    self._writer.send_bytes(b"")
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 183, in send_bytes
    self._check_closed()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed

Могу ли я что-нибудь сделать, чтобы избежать этой ошибки или указать c версии grequests / concurrent.futures, которые могут работать вместе ? Связано ли это с gevent или greenlet?

  • версия gevent - 1.4.0
  • версия greenlet - 0.4.15
  • версия grequests - 0.4.0

Код по-прежнему работает правильно, и программа продолжает работать без сбоев, но я бы предпочел, чтобы это исключение не регистрировалось каждый раз.

Кроме того, поскольку я новичок в многопроцессорной обработке, любые отзывы о том, как построен мой ProcessPoolExecutor, приветствуются. Я все еще учусь.

...