Как использовать subprocess.Popen для соединения нескольких процессов по каналам? - PullRequest
43 голосов
/ 17 ноября 2008

Как выполнить следующую команду оболочки с помощью модуля Python subprocess?

echo "input data" | awk -f script.awk | sort > outfile.txt

Входные данные будут поступать из строки, поэтому мне на самом деле не нужно echo. Я дошел до этого, кто-нибудь может объяснить, как я получаю его по каналу через sort тоже?

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

ОБНОВЛЕНИЕ : Обратите внимание, что хотя принятый ниже ответ на самом деле не отвечает на поставленный вопрос, я считаю, что С.Лотт прав, и лучше вообще не решать эту проблему!

Ответы [ 8 ]

38 голосов
/ 17 ноября 2008

Вы были бы немного счастливее со следующим.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

Делегировать часть работы в оболочку. Пусть он соединит два процесса с конвейером.

Вы были бы намного счастливее переписать 'script.awk' в Python, исключив awk и конвейер.

Редактировать . Некоторые из причин предположить, что awk не помогает.

[Слишком много причин, чтобы отвечать через комментарии.]

  1. Awk добавляет незначительный шаг. В обработке awk нет ничего уникального, что Python не обрабатывает.

  2. Конвейерная обработка от awk к сортировке для больших наборов данных может сократить время обработки. Для коротких наборов данных это не имеет существенного преимущества. Быстрое измерение awk >file ; sort file и awk | sort поможет выявить параллелизм. С сортировкой это редко помогает, потому что сортировка не является сквозным фильтром.

  3. Простота обработки "Python to sort" (вместо "Python to awk to sort") предотвращает точный тип вопросов, задаваемых здесь.

  4. Python - хотя и более многословный, чем awk - также явный, где awk имеет определенные неявные правила, которые непрозрачны для новичков и вводят в заблуждение неспециалистов.

  5. Awk (как и сам скрипт оболочки) добавляет еще один язык программирования. Если все это можно сделать на одном языке (Python), устранение оболочки и программирование на awk исключают два языка программирования, что позволяет кому-то сосредоточиться на составляющих ценность частях задачи.

Итог: awk не может добавить значительную ценность. В этом случае awk является чистой стоимостью; это добавило достаточно сложности, чтобы пришлось задать этот вопрос. Удаление awk будет чистой прибылью.

Боковая панель Почему строить трубопровод (a | b) так сложно.

Когда оболочка сталкивается с a | b, она должна сделать следующее.

  1. Разветвите дочерний процесс оригинальной оболочки. Это в конечном итоге станет б.

  2. Сборка операционной трубы. (не подпроцесс Python.PIPE), но вызовите os.pipe(), который возвращает два новых файловых дескриптора, которые связаны через общий буфер. На этом этапе у процесса есть stdin, stdout, stderr от его родителя, плюс файл, который будет "a's stdout" и "b's stdin".

  3. Вилка ребенка. Ребенок заменяет свой стандартный вывод новым стандартным выводом. Выполните процесс a.

  4. Дочерний объект b закрывает, заменяет его стандартный ввод новым стандартным b. Выполните процесс b.

  5. Ребенок b ожидает завершения.

  6. Родитель ожидает завершения b.

Я думаю, что вышеупомянутое может быть использовано рекурсивно для порождения a | b | c, но вы должны неявно заключить в скобки длинные конвейеры, трактуя их как a | (b | c).

Так как в Python есть os.pipe(), os.exec() и os.fork(), и вы можете заменить sys.stdin и sys.stdout, есть способ сделать это в чистом Python. В самом деле, вы можете работать с некоторыми сочетаниями клавиш, используя os.pipe() и subprocess.Popen.

Однако проще делегировать эту операцию оболочке.

22 голосов
/ 06 февраля 2012
import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)
17 голосов
/ 23 мая 2013

Чтобы эмулировать конвейер оболочки:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

без вызова оболочки (см. 17.1.4.2. Замена конвейера оболочки ):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum предоставляет некоторый синтаксический сахар:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

Аналог:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

есть:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
3 голосов
/ 17 ноября 2008

http://www.python.org/doc/2.5.2/lib/node535.html это очень хорошо. Есть ли какая-то часть этого, которую вы не поняли?

Ваша программа будет очень похожа, но вторая Popen будет иметь stdout = для файла, и вам не понадобится вывод ее .communicate().

2 голосов
/ 09 декабря 2018

Принятый ответ обходит стороной проблему. Вот фрагмент кода, который объединяет результаты нескольких процессов: Обратите внимание, что он также печатает (несколько) эквивалентную команду оболочки, чтобы вы могли запустить ее и убедиться, что вывод верен.

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
2 голосов
/ 10 февраля 2015

Вдохновленный ответом @ Cristian. Я встретил точно такую ​​же проблему, но с другой командой. Поэтому я выкладываю свой проверенный пример, который, как мне кажется, может быть полезен:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

Это проверено.

Что сделано

  • Объявлено ленивое grep выполнение с stdin из pipe. Эта команда будет выполнена при выполнении команды ps, когда канал будет заполнен стандартным выводом ps.
  • Вызывается первичная команда ps с stdout, направленным на канал, используемый командой grep.
  • Grep связался, чтобы получить stdout из трубы.

Мне нравится этот способ, потому что это естественная концепция труб, аккуратно обернутая subprocess интерфейсами.

1 голос
/ 18 ноября 2017

Предыдущие ответы упустили важный момент. Замена конвейера оболочки в основном правильная, как указывает geocar. почти достаточно для запуска communicate на последнем элементе трубы.

Остается проблема с передачей входных данных в конвейер. С несколькими подпроцессами простой communicate(input_data) на последнем элементе не работает - он зависает навсегда. Вам нужно вручную создать конвейер и дочерний элемент, например:

import os
import subprocess

input = """\
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

Теперь дочерний элемент обеспечивает ввод через канал, а родительские вызовы взаимодействуют (), что работает, как и ожидалось. При таком подходе вы можете создавать произвольные длинные конвейеры, не прибегая к «делегированию части работы оболочке». К сожалению, документация подпроцесса 1015 * не упоминает об этом.

Есть способы добиться того же эффекта без труб:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

Теперь используйте stdin=tf для p_awk. Это вопрос вкуса, что вы предпочитаете.

Выше все равно не на 100% эквивалентно bash-конвейерам, потому что обработка сигналов отличается. Это можно увидеть, если добавить еще один элемент трубы, который усекает вывод sort, например head -n 10. С кодом выше, sort выведет сообщение об ошибке «Сломанная труба» на stderr. Вы не увидите это сообщение при запуске того же конвейера в оболочке. (Это единственное отличие, результат в stdout тот же). Кажется, причина в том, что Popen Python устанавливает SIG_IGN для SIGPIPE, тогда как оболочка оставляет его на SIG_DFL, и обработка сигналов sort отличается в этих двух случаях.

1 голос
/ 04 декабря 2014

РЕДАКТИРОВАТЬ: pipes доступно в Windows, но, что важно, на самом деле не работает в Windows. Смотрите комментарии ниже.

Стандартная библиотека Python теперь включает модуль pipes для обработки этого:

https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html

Я не уверен, как долго работает этот модуль, но этот подход кажется намного проще, чем копаться с subprocess.

...