Запуск Python 3.7.3 в Windows,
У меня есть ситуация, когда цикл событий asyncio
никогда не прерывается из процесса, вызванного многопроцессорностью.Я не могу показать весь код, но это так:
- Я использую
multiprocessing
для ускорения запросов с использованием стороннего API. - Этот API
thirdparty.api
поддерживает архитектуру сервер-клиент и использует внутренний цикл событий asyncio.Он запускает цикл обработки событий в отдельном потоке;в этом потоке он вызывает event_loop.run_forever()
и прерывается только на KeyboardInterrupt
. - Запуск рабочего сценария с использованием многопроцессорной обработки, API всегда возвращает, будь то успех или неудача.Ранее я сталкивался с регрессией Py3.7.2, когда в Windows исполняемый файл venv Python работает некорректно https://bugs.python.org/issue35797. Но теперь это исправлено в Py3.7.3, и моя проблема сохраняется.
- Запуск этого сценария издругой скрипт Py27, использующий
subprocess
.Внутри моего многопроцессорного рабочего процесса, если запрос не удался, вызов никогда не возвращается, и он не может выйти из рабочего процесса естественным образом, даже универсальный обработчик исключений ничего не поймает и застрянет.
фрагменты кода моего скрипта вызывающей стороны:
#!/usr/bin/env python2
def main()
try:
cmd = ['C:\\Python\\Python37\\pythonw.exe', 'worker.py']
print(' '.join(cmd))
proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = proc.communicate()
except subprocess.CalledProcessError as err:
print(err)
except Exception:
traceback.print_exc()
else:
print('try popen finished with else.')
print('stdout: {}'.format(out))
print('stderr: {}'.format(err))
if __name__ == '__main__':
main()
фрагменты псевдокода моей рабочей функции worker.py
выглядят следующим образом:
#!/usr/bin/env python3
args = [
...
]
def worker(*mpargs):
with thirdparty.api() as myclient:
try:
myclient.query(*args)
except Exception:
traceback.print_exc()
def exception_worker(*mpargs)
raise RuntimeError('Making trouble!')
def main():
logging.info('STARTED!')
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(worker, args)
# results = pool.map(exception_worker, args)
pool.close()
pool.join()
logging.info('ALL DONE!')
if __name__ == '__main__':
main()
thirdparty.api запускает цикл событий в своемКонструктор:
self.loop = asyncio.get_event_loop()
if self.loop.is_closed():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
затем в отдельном потоке:
try:
self._loop.run_forever()
except KeyboardInterrupt:
pass
self.loop.close()
Я пробовал другого работника exception_worker
, который просто генерирует исключения, и этот возвращается без проблем.
Как мне решить это?