[Редактировать: Сначала прочитайте принятый ответ.Приведенное ниже длинное исследование связано с небольшой ошибкой измерения времени.]
Мне часто приходится обрабатывать очень большие (более 100 ГБ) текстовые / CSV-подобные файлы, содержащие сильно избыточные данные, которые практически невозможно сохранить на несжатом диске.Я сильно полагаюсь на внешние компрессоры, такие как lz4 и zstd, которые производят потоки stdout, приближающиеся к 1 ГБ / с.
Поэтому я очень беспокоюсь о производительности конвейеров оболочки Unix.Но большие сценарии оболочки сложно поддерживать, поэтому я стараюсь создавать конвейеры в Python, объединяя команды с осторожным использованием shlex.quote()
.
Этот процесс утомителен и подвержен ошибкам, поэтому я бы хотел«Pythonic» способ достичь той же цели, управляя файловыми дескрипторами stdin / stdout в Python без разгрузки до /bin/sh
.Однако я никогда не находил способ сделать это без значительного снижения производительности.
Документация Python 3 рекомендует заменять конвейеры оболочки методом communicate()
на subprocess.Popen
.Я адаптировал этот пример для создания следующего тестового сценария, который переносит 3 ГБ /dev/zero
в бесполезный grep
, который ничего не выводит:
#!/usr/bin/env python3
from shlex import quote
from subprocess import Popen, PIPE
from time import perf_counter
BYTE_COUNT = 3_000_000_000
UNQUOTED_HEAD_CMD = ["head", "-c", str(BYTE_COUNT), "/dev/zero"]
UNQUOTED_GREP_CMD = ["grep", "Arbitrary string which will not be found."]
QUOTED_SHELL_PIPELINE = " | ".join(
" ".join(quote(s) for s in cmd)
for cmd in [UNQUOTED_HEAD_CMD, UNQUOTED_GREP_CMD]
)
perf_counter()
proc = Popen(QUOTED_SHELL_PIPELINE, shell=True)
proc.wait()
print(f"Time to run using shell pipeline: {perf_counter()} seconds")
perf_counter()
p1 = Popen(UNQUOTED_HEAD_CMD, stdout=PIPE)
p2 = Popen(UNQUOTED_GREP_CMD, stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()
p2.communicate()
print(f"Time to run using subprocess.PIPE: {perf_counter()} seconds")
Вывод:
Time to run using shell pipeline: 2.412427189 seconds
Time to run using subprocess.PIPE: 4.862174164 seconds
Подход subprocess.PIPE
медленнее, чем /bin/sh
.Если мы увеличим входной размер до 90 ГБ (BYTE_COUNT = 90_000_000_000
), мы подтвердим, что это не накладные расходы постоянного времени:
Time to run using shell pipeline: 88.796322932 seconds
Time to run using subprocess.PIPE: 183.734968687 seconds
До сих пор я предполагал, что subprocess.PIPE
- это просто высокий уровеньабстракция для подключения файловых дескрипторов, и эти данные никогда не копируются в сам процесс Python.Как и ожидалось, при выполнении вышеуказанного теста head
использует 100% ЦП, но subproc_test.py
использует почти нулевой ЦП и ОЗУ.
Учитывая это, почему мой конвейер такой медленный?Это внутреннее ограничение Python subprocess
?Если да, то что же делает /bin/sh
по-другому под капотом, что делает его вдвое быстрее?
В целом, существуют ли лучшие методы для создания больших высокопроизводительных конвейеров подпроцесса в Python?