Почему вызов метода connect () для компонента конвейера, отличного от последнего, может привести к искаженному выводу? - PullRequest
0 голосов
/ 03 апреля 2020

Я хочу сосчитать вхождения около 500 шаблонов в большой файл .fastq (59 миллионов строк). Все шаблоны имеют длину ровно 20 символов.

В unix это будет просто:

grep -F -o -f patterns.txt big_file.fastq | sort | uniq -c

Тем не менее, я sh, чтобы избежать записи временного файла шаблонов, поэтому я создал канал, используя библиотеку подпроцесса python:

from subprocess import Popen, PIPE, STDOUT

p1 = Popen(["grep", "-F", "-o", "-f", "-", "big_file.fastq"], shell = False, stdin = PIPE, stdout = PIPE, stderr= STDOUT)
p2 = Popen(["sort"], shell = False, stdin = p1.stdout, stdout = PIPE, stderr = STDOUT)
p3 = Popen(["uniq", "-c"], shell = False, stdin = p2.stdout, stdout = PIPE, stderr = STDOUT)

Затем я вызываю connect () для этого, предоставляя закодированный файлоподобный объект io.StringIO в качестве входных данных (который передается в grep команда с использованием '-'):

import io

patterns_file = io.StringIO("\n".join(patterns_list))
p3.communicate(input = patterns_file.read().encode('utf-8'))[0]

Когда я вызываю connect () для uniq, как это, это работает нормально.

Однако во время тестирования я ошибочно вызвал его в первой части трубы:

p1.communicate(input = patterns_file.read().encode('utf-8'))[0]

Это дало мне совершенно неверные результаты, включая совпадения, которые были короче или длиннее ожидаемых 20 символов.

Я не понимаю, почему это так. Разве вызов метода connect () на p1 не затрагивает только эту часть канала и игнорирует остальные? Удаление p2 и p3 привело к корректному отображению p1. Я чувствую, что что-то упускаю из-за того, как работает Попен.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 03 апреля 2020

Когда вы создаете экземпляры Popen объектов, подпроцессы, на которые они ссылаются, немедленно запускаются. Таким образом, даже если вы звоните только communicate() на p1, p2 и p3 также работают.

Почему это важно? Потому что p2 все еще имеет свой стандартный вывод, подключенный к FIFO, в который p1 записывает свой вывод!

Если операция sort на p2 все еще читает содержимое в то же время, что вы спрашиваете у * Программа 1026 * для непосредственного чтения того же контента, в итоге вы получите результат p1, разделенный между ними. Можно ожидать, что веселье произойдет: единственный способ чтения, разделенный между двумя программами , не приведет к явному повреждению данных, это если бы и 1017 *, и communicate() считывали блоки, кратные 20 байтам ( пока еще достаточно мал, чтобы операционная система не разделяла их на несколько системных вызовов); однако типичные размеры фрагментов, используемые для небуферизованного чтения, кратны 4096, создавая смещение по 4 байта от границы записи при каждом чтении фрагмента.


Кстати - для много программ это не будет иметь столь серьезного эффекта, потому что FIFO имеют относительно небольшой буфер; программа, которая записывает строку вывода для каждой строки ввода, которую она считывает, очень быстро блокируется на выходе и, таким образом, прекращает чтение дальнейшего ввода до тех пор, пока ее выход не будет хотя бы частично очищен. sort является исключением, потому что ему нужно прочитать все входные данные, прежде чем он узнает, какой будет его первая строка вывода!

...