Я новичок в многопроцессорной обработке и пытаюсь выполнить вычисление для словаря, используя concurrent.futures.ProcessPoolExecutor (). Вот самое простое воспроизведение моего кода, который работает как задумано:
#import grequests
import concurrent.futures
def doSomething(n):
return n ** 2
myList = [0, 1, 2, 3]
myDict = {0:0,1:1,2:2,3:3}
anotherDict = {}
futures = {}
with concurrent.futures.ProcessPoolExecutor() as pool:
for id in myList:
returnObj = pool.submit(doSomething, myDict[id])
futures[id] = returnObj
for id in futures.keys():
anotherDict[id] = futures[id].result()
print(anotherDict)
Вывод правильный: {0: 0, 1: 1, 2: 4, 3: 9}
Однако мне нужно использовать греквесты в других местах программы (я смог чтобы определить это виновник). Импортирование командных запросов в приведенном выше коде и повторный запуск приводит к следующей трассировке:
Exception in thread QueueManagerThread:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 440, in _queue_management_worker
shutdown_worker()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 344, in shutdown_worker
p.join()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 140, in join
res = self._popen.wait(timeout)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 48, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
pid, sts = os.waitpid(self.pid, flag)
File "/Users/joecat/venvjc/lib/python3.7/site-packages/gevent/os.py", line 368, in waitpid
get_hub().wait(new_watcher)
File "src/gevent/_hub_primitives.py", line 46, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent.__hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 151, in gevent.__waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 60, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/__greenlet_primitives.pxd", line 35, in gevent.__greenlet_primitives._greenlet_switch
gevent.exceptions.LoopExit: This operation would block forever
Hub: <Hub '' at 0x103b57590 select pending=0 ref=0 thread_ident=0x700007c42000>
Handles:
[]
{0: 0, 1: 1, 2: 4, 3: 9}
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 102, in _python_exit
thread_wakeup.wakeup()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 90, in wakeup
self._writer.send_bytes(b"")
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 183, in send_bytes
self._check_closed()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
Могу ли я что-нибудь сделать, чтобы избежать этой ошибки или указать c версии grequests / concurrent.futures, которые могут работать вместе ? Связано ли это с gevent или greenlet?
- версия gevent - 1.4.0
- версия greenlet - 0.4.15
- версия grequests - 0.4.0
Код по-прежнему работает правильно, и программа продолжает работать без сбоев, но я бы предпочел, чтобы это исключение не регистрировалось каждый раз.
Кроме того, поскольку я новичок в многопроцессорной обработке, любые отзывы о том, как построен мой ProcessPoolExecutor, приветствуются. Я все еще учусь.