взаимоблокировка с использованием subprocess, pty и threadpools - PullRequest
3 голосов
/ 13 октября 2019

У меня есть особый случай, когда я хотел бы подделать tty для подпроцессов, которые запускаются в ThreadPoolExecutor (например, xargs -p) и захватить вывод.

Я создалследующие, которые, кажется, работают хорошо серийно:

import contextlib
import concurrent.futures
import errno
import os
import subprocess
import termios


@contextlib.contextmanager
def pty():
    r, w = os.openpty()
    try:
        yield r, w
    finally:
        for fd in r, w:
            try:
                os.close(fd)
            except OSError:
                pass


def cmd_output_p(*cmd):
    with pty() as (r, w):
        proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=w, stderr=w)
        os.close(w)
        proc.stdin.close()

        buf = b''
        while True:
            try:
                bts = os.read(r, 1024)
            except OSError as e:
                if e.errno == errno.EIO:
                    bts = b''
                else:
                    raise
            else:
                buf += bts
            if not bts:
                break

    return proc.wait(), buf

и некоторые примеры использования:

>>> cmd_output_p('python', '-c', 'import sys; print(sys.stdout.isatty())')
(0, b'True\r\n')

потрясающе! однако, когда я запускаю эту же процедуру в concurrent.futures.ThreadPoolExecutor, существует довольно много режимов отказов (есть еще один более редкий режим отказов, при котором OSError: [Errno 9] Bad file descriptor происходит на кажущейся случайной строке кода - но я не выделил воспроизведение дляэто все еще)

например, следующий код:

def run(arg):
    print(cmd_output_p('echo', arg))


with concurrent.futures.ThreadPoolExecutor(5) as exe:
    exe.map(run, ('1',) * 1000)

Этот процесс выполняется через несколько процессов, а затем в конечном итоге зависает. Выдача ^C дает следующие стековые трассировки (для завершения процесса дважды требуется ^ C)

$ python3 t.py
...

(0, b'1\r\n')
(0, b'1\r\n')
(0, b'1\r\n')
^CTraceback (most recent call last):
  File "t.py", line 49, in <module>
    exe.map(run, ('1',) * 1000)
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 611, in __exit__
    self.shutdown(wait=True)
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 152, in shutdown
    t.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

предположительно я делаю что-то не так, но все примеры, которые я нашел для взаимодействия сsubprocess / pty выполняет те же шаги - как я могу предотвратить этот тупик?

1 Ответ

1 голос
/ 13 октября 2019

Файловый дескриптор, открытый как w одним потоком, может быть повторно использован другим потоком между вызовом os.close в cmd_output_p и вызовом в pty. В этом случае он неожиданно закрывается вторым close (который в противном случае потерпел бы неудачу «безвредно» с EBADF). (Практически в любой многопоточной программе EBADF следует рассматривать как ошибку подтверждения.)

Это, безусловно, может вызвать EBADF из другого потока. Если файловый дескриптор будет вновь открыт, это может также привести к взаимоблокировке, поскольку оба конца псевдотерминала предназначены для чтения-записи.

...