Почему асинхронный процесс Python в потоке кажется нестабильным в Linux? - PullRequest
0 голосов
/ 19 декабря 2018

Я пытаюсь запустить python3 асинхронную внешнюю команду из приложения Qt.Прежде чем я использовал многопроцессорный поток, чтобы сделать это без замораживания приложения Qt.Но теперь я хотел бы сделать это с QThread, чтобы иметь возможность засечь и дать QtWindows в качестве аргумента для некоторых других функций (не представленных здесь).Я сделал это и успешно проверил на моей Windows ОС, но я попробовал приложение на моей Linux ОС, я получил следующую ошибку: RuntimeError: Cannot add child handler, the child watcher does not have a loop attached

С этого момента я попытался изолироватьпроблема, и я получаю минимальный (насколько это возможно) пример ниже, который повторяет проблему.Конечно, как я упоминал ранее, если я заменю QThreadPool на список multiprocessing.thread, этот пример будет работать хорошо.Я также осознал кое-что, что поразило меня: если я раскомментирую строку rc = subp([sys.executable,"./HelloWorld.py"]) в последней части примера, это тоже работает.Я не мог объяснить себе, почему.

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

## IMPORTS ##
from functools import partial
from PyQt5 import QtCore
from PyQt5.QtCore import QThreadPool, QRunnable, QCoreApplication
import sys
import asyncio.subprocess

# Global variables
Qpool = QtCore.QThreadPool()


def subp(cmd_list):
    """ """

    if sys.platform.startswith('linux'):
        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)
    elif sys.platform.startswith('win'):
        new_loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
        asyncio.set_event_loop(new_loop)
    else :
        print('[ERROR]     OS not available for encodage... EXIT')
        sys.exit(2)

    rc, stdout, stderr= new_loop.run_until_complete(get_subp(cmd_list) )
    new_loop.close()
    if rc!=0 :
        print('Exit not zero ({}): {}'.format(rc, sys.exc_info()[0]) )#, exc_info=True)
    return rc, stdout, stderr

async def get_subp(cmd_list):
    """ """

    print('subp: '+' '.join(cmd_list) )
    # Create the subprocess, redirect the standard output into a pipe
    create = asyncio.create_subprocess_exec(*cmd_list, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) #
    proc = await create

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = await asyncio.gather(
            read_stream_and_display(proc.stdout),
            read_stream_and_display(proc.stderr))
    except Exception:
        proc.kill()
        raise
    finally:
        rc = await proc.wait()
        print(" [Exit {}] ".format(rc)+' '.join(cmd_list))
    return rc, stdout, stderr

async def read_stream_and_display(stream):
    """ """
    async for line in stream:
        print(line, flush=True)

class Qrun_from_job(QtCore.QRunnable):
    def __init__(self, job, arg):
        super(Qrun_from_job, self).__init__()
        self.job=job
        self.arg=arg

    def run(self):
        code = partial(self.job)
        code()

def ThdSomething(job,arg):
    testRunnable = Qrun_from_job(job,arg)
    Qpool.start(testRunnable)

def testThatThing():
    rc = subp([sys.executable,"./HelloWorld.py"])


if __name__=='__main__':
    app = QCoreApplication([])
    # rc = subp([sys.executable,"./HelloWorld.py"])
    ThdSomething(testThatThing,'tests')
    sys.exit(app.exec_())

с файлом HelloWorld.py:

#!/usr/bin/env python3
import sys
if __name__=='__main__':
   print('HelloWorld')
   sys.exit(0)

Поэтому у меня два вопроса: Как заставить этот пример работать правильно с QThread?И почему предыдущий вызов асинхронной задачи (с вызовом функции subp) изменил стабильность примера в Linux?

EDIT

Следуя советам @ user4815162342, я пытался сrun_coroutine_threadsafe с кодом ниже.Но он не работает и возвращает ту же ошибку, т.е. RuntimeError: Cannot add child handler, the child watcher does not have a loop attached.Я также попытался изменить команду threading на ее эквивалент в модуле mutliprocessing;и с последним команда subp никогда не запускается.

Код:

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

## IMPORTS ##
import sys
import asyncio.subprocess
import threading
import multiprocessing

# at top-level
loop = asyncio.new_event_loop()

def spin_loop():
    asyncio.set_event_loop(loop)
    loop.run_forever()

def subp(cmd_list):
    # submit the task to asyncio
    fut = asyncio.run_coroutine_threadsafe(get_subp(cmd_list), loop)
    # wait for the task to finish
    rc, stdout, stderr = fut.result()
    return rc, stdout, stderr

async def get_subp(cmd_list):
    """ """
    print('subp: '+' '.join(cmd_list) )
    # Create the subprocess, redirect the standard output into a pipe
    proc = await asyncio.create_subprocess_exec(*cmd_list, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) #

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = await asyncio.gather(
            read_stream_and_display(proc.stdout),
            read_stream_and_display(proc.stderr))
    except Exception:
        proc.kill()
        raise
    finally:
        rc = await proc.wait()
        print(" [Exit {}] ".format(rc)+' '.join(cmd_list))
    return rc, stdout, stderr

async def read_stream_and_display(stream):
    """ """
    async for line in stream:
        print(line, flush=True)

if __name__=='__main__':
    threading.Thread(target=spin_loop, daemon=True).start()
    # multiprocessing.Process(target=spin_loop, daemon=True).start()
    print('thread passed')
    rc = subp([sys.executable,"./HelloWorld.py"])
    print('end')
    sys.exit(0)

Ответы [ 2 ]

0 голосов
/ 21 декабря 2018

Я подозреваю, что то, что вы делаете, просто не поддерживается - в соответствии с документацией :

Для обработки сигналов и выполнения подпроцессов цикл обработки событий должен быть запущен восновной поток.

Поскольку вы пытаетесь выполнить подпроцесс, я не думаю, что запуск нового цикла событий в другом потоке работает.

Дело в том, что Qt уже имеет цикл событийи что вам действительно нужно, так это убедить asyncio использовать его.Это означает, что вам нужна реализация цикла событий, которая предоставляет «интерфейс цикла событий для asyncio», реализованный поверх «цикла событий Qt».

Я считаю, что asyncqt обеспечивает такую ​​реализацию.Вы можете попробовать использовать QEventLoop(app) вместо asyncio.new_event_loop().

0 голосов
/ 21 декабря 2018

В качестве общего принципа проектирования, нет необходимости и расточительно создавать новые циклы событий только для запуска одной подпрограммы.Вместо этого создайте цикл обработки событий, запустите его в отдельном потоке и используйте его для всех своих асинхронных задач, отправив ему задачи с помощью asyncio.run_coroutine_threadsafe.

Например:

# at top-level
loop = asyncio.new_event_loop()
def spin_loop():
    asyncio.set_event_loop(loop)
    loop.run_forever()

asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=spin_loop, daemon=True).start()
# ... the rest of your code ...

Имея это в виду, вы можете легко выполнить любой асинхронный код из любого потока, используя следующее:

def subp(cmd_list):
    # submit the task to asyncio
    fut = asyncio.run_coroutine_threadsafe(get_subp(cmd_list), loop)
    # wait for the task to finish
    rc, stdout, stderr = fut.result()
    return rc, stdout, stderr

Обратите внимание, что вы можете использовать add_done_callback, чтобы получать уведомления при возврате в будущемна asyncio.run_coroutine_threadsafe заканчивается, поэтому вам может не понадобиться поток в первую очередь.

Обратите внимание, что все взаимодействие с циклом событий должно проходить либо через вышеупомянутый run_coroutine_threadsafe (при отправке сопрограмм), либо черезloop.call_soon_threadsafe когда вам нужен цикл обработки событий для вызова обычной функции.Например, чтобы остановить цикл обработки событий, вы должны вызвать loop.call_soon_threadsafe(loop.stop).

...