Запуск сценария через подпроцесс зависает, когда сценарий запускает asyncio eventloop внутри многопроцессорных рабочих - PullRequest
0 голосов
/ 31 мая 2019

Запуск Python 3.7.3 в Windows,

У меня есть ситуация, когда цикл событий asyncio никогда не прерывается из процесса, вызванного многопроцессорностью.Я не могу показать весь код, но это так:

  1. Я использую multiprocessing для ускорения запросов с использованием стороннего API.
  2. Этот API thirdparty.apiподдерживает архитектуру сервер-клиент и использует внутренний цикл событий asyncio.Он запускает цикл обработки событий в отдельном потоке;в этом потоке он вызывает event_loop.run_forever() и прерывается только на KeyboardInterrupt.
  3. Запуск рабочего сценария с использованием многопроцессорной обработки, API всегда возвращает, будь то успех или неудача.Ранее я сталкивался с регрессией Py3.7.2, когда в Windows исполняемый файл venv Python работает некорректно https://bugs.python.org/issue35797. Но теперь это исправлено в Py3.7.3, и моя проблема сохраняется.
  4. Запуск этого сценария издругой скрипт Py27, использующий subprocess.Внутри моего многопроцессорного рабочего процесса, если запрос не удался, вызов никогда не возвращается, и он не может выйти из рабочего процесса естественным образом, даже универсальный обработчик исключений ничего не поймает и застрянет.

фрагменты кода моего скрипта вызывающей стороны:

#!/usr/bin/env python2

def main()
    try:
        cmd = ['C:\\Python\\Python37\\pythonw.exe', 'worker.py']
        print(' '.join(cmd))
        proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        out, err = proc.communicate()
    except subprocess.CalledProcessError as err:
        print(err)
    except Exception:
        traceback.print_exc()
    else:
        print('try popen finished with else.')
    print('stdout: {}'.format(out))
    print('stderr: {}'.format(err))


if __name__ == '__main__':
    main()

фрагменты псевдокода моей рабочей функции worker.py выглядят следующим образом:

#!/usr/bin/env python3

args = [
   ...
]


def worker(*mpargs):
    with thirdparty.api() as myclient:
        try:
            myclient.query(*args)
        except Exception:
            traceback.print_exc()


def exception_worker(*mpargs)
    raise RuntimeError('Making trouble!')


def main():
    logging.info('STARTED!')
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.map(worker, args)
        # results = pool.map(exception_worker, args)
        pool.close()
        pool.join()
    logging.info('ALL DONE!')


if __name__ == '__main__':
    main()

thirdparty.api запускает цикл событий в своемКонструктор:

self.loop = asyncio.get_event_loop()
if self.loop.is_closed():
    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)

затем в отдельном потоке:

try:
    self._loop.run_forever()
except KeyboardInterrupt:
    pass
self.loop.close()

Я пробовал другого работника exception_worker, который просто генерирует исключения, и этот возвращается без проблем.

Как мне решить это?

1 Ответ

0 голосов
/ 05 июня 2019

После подробного описания проблемы я наконец-то нашел решение в этом посте: Почему я получаю NotImplementedError с асинхронным ожиданием в Windows?

thirdparty.api, необходимый для этогоподробно и после исправления моя проблема исчезла.

...