Объединение больших файлов, трубопроводов и бонусов - PullRequest
2 голосов
/ 27 марта 2012

Были заданы похожие вопросы (и даны ответы), но никогда не было вместе, и я не могу заставить что-то работать. Так как я только начинаю с Python, что-то простое для понимания было бы здорово!

У меня есть 3 больших файла данных (> 500 ГБ), которые мне нужно распаковать, объединить, направить в подпроцесс, а затем направить вывод в другой подпроцесс. Затем мне нужно обработать тот окончательный вывод, который я хотел бы сделать в Python. Примечание. Мне не нужен разархивированный и / или составной файл, за исключением обработки - создание, я думаю, было бы пустой тратой пространства. Вот что у меня пока ...

import gzip
from subprocess import Popen, PIPE

#zipped files
zipfile1 = "./file_1.txt.gz"   
zipfile2 = "./file_2.txt.gz"  
zipfile3 = "./file_3.txt.gz"


# Open the first pipe
p1 = Popen(["dataclean.pl"], stdin=PIPE, stdout=PIPE)

# Unzip the files and pipe them in (has to be a more pythonic way to do it - 
# if this is even correct)
unzipfile1 = gzip.open(zipfile1, 'wb')
p1.stdin.write(unzipfile1.read())
unzipfile1.close()

unzipfile2 = gzip.open(zipfile2, 'wb')
p1.stdin.write(unzipfile2.read())
unzipfile2.close()

unzipfile3 = gzip.open(zipfile3, 'wb')
p1.stdin.write(unzipfile3.read())
unzipfile3.close()


# Pipe the output of p1 to p2
p2 = Popen(["dataprocess.pl"], stdin=p1.stdout, stdout=PIPE)

# Not sure what this does - something about a SIGPIPE
p1.stdout.close()

## Not sure what this does either - but it is in the pydoc
output = p2.communicate()[0]

## more processing of p2.stdout...
print p2.stdout

Любые предложения будут с благодарностью. * В качестве дополнительного вопроса ... Пидок для чтения () говорит:

"Также обратите внимание, что в неблокирующем режиме может быть возвращено меньше данных, чем было запрошено, даже если не задан параметр размера."

Это кажется страшным. Кто-нибудь может это интерпретировать? Я не хочу читать только часть набора данных, думая, что это все. Я думал, что оставить размер файла - это хорошо, особенно когда я не знаю размер файла.

Спасибо

GK

1 Ответ

4 голосов
/ 27 марта 2012

обо всем по порядку; Я думаю, что у вас неправильные режимы:

unzipfile1 = gzip.open(zipfile1, 'wb')

Это должно открыться zipfile1 для записи , а не чтения. Я надеюсь, что ваши данные все еще существуют.

Во-вторых, вы не хотите пытаться работать со всеми данными одновременно . Вы должны работать с данными в блоках по 16 КБ или 32 КБ или что-то. (Оптимальный размер будет варьироваться в зависимости от многих факторов; сделайте его настраиваемым, если эту задачу нужно выполнять много раз, чтобы можно было рассчитывать разные размеры.)

То, что вы ищете, вероятно, больше похоже на этот непроверенный псевдокод:

while (block = unzipfile1.read(4096*4)):
    p1.stdin.write(a)

Если вы пытаетесь соединить несколько процессов в конвейере в Python, то это, вероятно, будет выглядеть примерно так:

while (block = unzipfile1.read(4096*4)):
    p1.stdin.write(a)
    p2.stdin.write(p1.stdout.read())

Это дает вывод от p1 до p2 как можно быстрее. Я сделал предположение, что p1 не будет генерировать значительно больше ввода, чем было дано. Если вывод p1 будет в десять раз больше, чем ввод, то вы должны сделать еще один цикл, похожий на этот.


Но, я должен сказать, это похоже на дополнительную работу по копированию сценария оболочки:

gzip -cd file1.gz file2.gz file3.gz | dataclean.py | dataprocess.pl

gzip(1) будет автоматически обрабатывать передачу данных размером с блок, как я описал выше, и при условии, что ваши dataclean.py и dataprocess.pl сценарии также работают с данными в блоках, а не выполняют полное чтение (как и в оригинальной версии этого скрипта), тогда все они должны работать параллельно, в меру своих возможностей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...