Почему asyncio subprocess.communicate зависает при вызове в другом потоке? - PullRequest
0 голосов
/ 21 декабря 2018

У меня есть ситуация, когда связь подпроцесса зависает, когда мне нужно запустить подпроцесс в цикле событий asyncio, и все это находится в отдельном потоке.

Я узнал, что для запуска подпроцесса в отдельном потоке мне нужно иметь

1. an event loop running in main thread, and
2. a child watcher must be initiated in main thread.

После выполнения вышеуказанных условий я получил свою подпроцессную работу.Но subprocess.communicate сейчас висит.Тот же код работает, если вызывать его из основного потока.

После копания я заметил, что общение зависает, потому что процесс не завершается сам по себе.ie await process.wait() фактически зависает.

Я видел зависание связи, когда команда, которую я пытаюсь выполнить в самом подпроцессе, зависает, но здесь это не так.

import asyncio
import shlex
import threading
import subprocess
async def sendcmd(cmd):
    cmdseq = tuple(shlex.split(cmd))
    print(cmd)
    p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(p.pid)
    output = (await asyncio.wait_for(p.communicate(), 5))[0]
    output = output.decode('utf8')
    print(output)
    return output


async def myfunc(cmd):
    o = await sendcmd(cmd)
    return o

def myfunc2():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    tasks = []
    tasks.append(asyncio.ensure_future(myfunc('uname -a')))
    loop.run_until_complete(asyncio.gather(*tasks))

async def myfunc3():
    t = threading.Thread(target=myfunc2)
    t.start()
    t.join()

def main():
    asyncio.get_child_watcher()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.ensure_future(myfunc3()))
    loop.close()

main()

Ответы [ 2 ]

0 голосов
/ 12 января 2019

Я думаю, что это исправляет.Используйте цикл run_in_executor для потоков.

import asyncio
import shlex
import threading
import subprocess
import logging
async def sendcmd(cmd):
    cmdseq = tuple(shlex.split(cmd))
    print(cmd)
    p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(p.pid)
    output = (await asyncio.wait_for(p.communicate(), 5))[0]
    output = output.decode('utf8')
    print(output)
    return output

async def myfunc(cmd):
    o = await sendcmd(cmd)
    return o

def myfunc2():
    thread_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(thread_loop)
    thread_loop.set_debug(True)     
    tasks = []
    tasks.append(asyncio.ensure_future(myfunc('uname -a')))
    thread_loop.run_until_complete(asyncio.gather(*tasks))
    thread_loop.close()

async def myfunc3(loop=None):
    await loop.run_in_executor(None, myfunc2)    

def main():
    logfilename='test.log'
    print('Writing log to {}'.format(logfilename))
    logging.basicConfig(filename=logfilename, level=logging.INFO, format='%(asctime)s %(name)s %(module)s %(levelname)-8s %(message)s')
    logging.getLogger('asyncio').setLevel(logging.DEBUG)
    root = logging.getLogger(__name__)

    cw=asyncio.get_child_watcher()
    main_loop = asyncio.get_event_loop()
    main_loop.run_until_complete(asyncio.ensure_future(myfunc3(loop=main_loop)))
    cw.close()
    main_loop.close()

main()
0 голосов
/ 09 января 2019

Похоже, что подпроцесс SIGCHLD принимается не рабочим потоком, а родителем.Это означает, что process.wait () не будет сигнализироваться операционной системой.Здесь есть еще одно обсуждение .

Похоже, что дочерний наблюдатель должен обнаружить SIGCHLD и распространить его на другие потоки (или pids) и их циклы событий, что также кажетсябыть его главной целью дизайна.(Документация отсутствует, поэтому требуется чтение исходного кода.)

Примечание: я думаю, что t.join () блокирует основной поток , который запускает дочерний наблюдатель, так чтодолжен быть исправлен.Я просто поместил цикл while и завершил основной цикл обработки событий, когда t.is_alive () возвращает False.

Я заметил, что signal_noop срабатывает, так что это хорошо.Кажется, проблема связана с signal.set_wakeup_fd (self._csock.fileno ()), который, кажется, установлен правильно.Мне нужно немного больше отладить, чтобы узнать, как обрабатывается это событие и почему основной цикл событий не получает этот сигнал.Я сейчас замечаю, что _process_self_data (self, data) в unix_events.py не происходит.

Сигналы и потоки

Обработчики сигналов Python всегда выполняются в основном потоке Python, даже если сигнал был получен в другом потоке.Это означает, что сигналы не могут использоваться как средство межпотоковой связи.Вместо этого вы можете использовать примитивы синхронизации из потокового модуля.

Кроме того, только основной поток может устанавливать новый обработчик сигнала.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...