Эффективные методы буферизации и сканирования файлов для больших файлов в Python - PullRequest
9 голосов
/ 26 января 2011

Описание проблемы, с которой я столкнулся, немного сложнее, и я ошибусь в предоставлении более полной информации.Для нетерпеливых, вот краткий способ, которым я могу суммировать это:

Какой самый быстрый (наименьшее время выполнения) способ разбить текстовый файл на ВСЕ (перекрывающиеся) подстроки размера N (связанный)N, например, 36), выбрасывая символы новой строки.

Я пишу модуль, который анализирует файлы в формате генома FASTA ascii.Эти файлы содержат так называемый человеческий эталонный геном hg18, который вы можете загрузить из браузера UCSC генома *1008* (слизняки!), Если хотите.

Как вы заметите,файлы генома состоят из chr [1..22] .fa и chr [XY] .fa, а также из набора других небольших файлов, которые не используются в этом модуле.

Несколько модулей уже существуютдля анализа файлов FASTA, таких как SeqIO в BioPython.(Извините, я бы опубликовал ссылку, но у меня пока нет точек для этого.) К сожалению, каждый модуль, который я смог найти, не выполняет определенную операцию, которую я пытаюсь выполнить.

Мой модуль должен разделить данные генома (например, «CAGTACGTCAGACTATACGGAGCTA» может быть строкой) на каждую перекрывающуюся подстроку N-длины.Позвольте мне привести пример с использованием очень маленького файла (реальные файлы хромосом имеют длину от 355 до 20 миллионов символов) и N = 8

>>>import cStringIO
>>>example_file = cStringIO.StringIO("""\
>header
CAGTcag
TFgcACF
""")
>>>for read in parse(example_file):
...    print read
...
CAGTCAGTF
AGTCAGTFG
GTCAGTFGC
TCAGTFGCA
CAGTFGCAC
AGTFGCACF

Функция, которую я обнаружил, имела абсолютную наилучшую производительность из методовЯ мог бы подумать вот о чем:


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

Это работает, но, к сожалению, для анализа генома человека таким образом все еще требуется около 1,5 часов (см. Примечание ниже).Возможно, это лучшее из того, что я собираюсь увидеть с помощью этого метода (может быть необходим полный рефакторинг кода, но я бы хотел этого избежать, так как этот подход имеет некоторые весьма специфические преимущества в других областях кода), но яЯ подумал, что передам это сообществу.

Спасибо!

  • Обратите внимание, что на этот раз нужно много дополнительных вычислений, таких как вычисление чтения противоположной цепи и выполнение поиска по хеш-таблицехэш размером примерно 5G.

Заключение после ответа: Оказывается, что с помощью fileobj.read () и затем манипулирует получающейся строкой (string.replace ()и т. д.) занимало относительно мало времени и памяти по сравнению с остальной частью программы, и поэтому я использовал этот подход.Спасибо всем!

Ответы [ 4 ]

4 голосов
/ 26 января 2011

Не могли бы вы отобразить файл и начать просматривать его с помощью скользящего окна? Я написал маленькую глупую программу, которая работает довольно мало:

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sarnold  20919  0.0  0.0  33036  4960 pts/2    R+   22:23   0:00 /usr/bin/python ./sliding_window.py

Работа через файл фаста 636229 байт (найден через http://biostar.stackexchange.com/questions/1759) заняла 0,383 секунды.

#!/usr/bin/python

import mmap
import os

  def parse(string, size):
    stride = 8
    start = string.find("\n")
    while start < size - stride:
        print string[start:start+stride]
        start += 1

fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)
3 голосов
/ 26 января 2011

Некоторые классические изменения, связанные с вводом-выводом.

  • Используйте операцию чтения более низкого уровня, такую ​​как os.read, и считывайте в большой фиксированный буфер.
  • Используйте многопоточность / многопроцессорность, когда читаетсяи буферы, и другие процессы.
  • Если у вас несколько процессоров / машин, используйте многопроцессорную обработку / mq для разделения обработки между процессорами, как и map-Reduce.

Использование операции чтения более низкого уровняне было бы так много переписать.Остальные были бы довольно большими, переписывает.

3 голосов
/ 26 января 2011

Я подозреваю, что проблема в том, что у вас столько данных, которые хранятся в строковом формате, что действительно бесполезно для вашего случая использования, что у вас заканчивается реальная память и происходит перестановка. 128 ГБ должно быть достаточно , чтобы этого избежать ...:)

Поскольку вы указали в комментариях, что вам все равно нужно хранить дополнительную информацию, отдельный класс, который ссылается народительская строка будет моим выбором.Я провел короткий тест, используя chr21.fa из chromFa.zip из hg18;файл составляет около 48 МБ и чуть менее 1 млн. строк.У меня здесь только 1 ГБ памяти, поэтому я просто выбрасываю объекты потом.Таким образом, этот тест не будет показывать проблемы с фрагментацией, кэшем или другими, но я думаю, что он должен быть хорошей отправной точкой для измерения пропускной способности парсинга:

import mmap
import os
import time
import sys

class Subseq(object):
  __slots__ = ("parent", "offset", "length")

  def __init__(self, parent, offset, length):
    self.parent = parent
    self.offset = offset
    self.length = length

  # these are discussed in comments:
  def __str__(self):
    return self.parent[self.offset:self.offset + self.length]

  def __hash__(self):
    return hash(str(self))

  def __getitem__(self, index):
    # doesn't currently handle slicing
    assert 0 <= index < self.length
    return self.parent[self.offset + index]

  # other methods

def parse(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Subseq(whole, offset, size)

class Seq(object):
  __slots__ = ("value", "offset")
  def __init__(self, value, offset):
    self.value = value
    self.offset = offset

def parse_sep_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Seq(whole[offset:offset + size], offset)

def parse_plain_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_tuple(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield (whole, offset, size)

def parse_orig(file, size=8):
  file.readline() # skip header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

def parse_os_read(file, size=8):
  file.readline()  # skip header
  file_size = os.fstat(file.fileno()).st_size
  whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_mmap(file, size=8):
  file.readline()  # skip past the header
  buffer = ""
  for line in file:
    buffer += line
    if len(buffer) >= size:
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size].upper()
      buffer = buffer[-(len(buffer) - size + 1):]
  for start in xrange(0, len(buffer) - size + 1):
    yield buffer[start:start + size]

def length(x):
  return sum(1 for _ in x)

def duration(secs):
  return "%dm %ds" % divmod(secs, 60)


def main(argv):
  tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
  n = 0
  for fn in tests:
    n += 1
    with open(argv[1]) as f:
      start = time.time()
      length(fn(f))
      end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))

  fn = parse_mmap
  n += 1
  with open(argv[1]) as f:
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    start = time.time()
    length(fn(f))
    end = time.time()
  print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))


if __name__ == "__main__":
  sys.exit(main(sys.argv))

1  parse                 1m 42s
2  parse_sep_str         1m 42s
3  parse_tuple           0m 29s
4  parse_plain_str       0m 36s
5  parse_orig            0m 45s
6  parse_os_read         0m 34s
7  parse_mmap            0m 37s

Первые четыре - мой код, тогда как orig - ваш, а последние два - из других ответов здесь.

Пользовательские объекты гораздо дороже создавать и собирать, чем кортежи или простые строки!Это не должно вызывать удивления, но я не осознавал, что это будет иметь большое значение (сравните № 1 и № 3, которые на самом деле отличаются только в определяемом пользователем классе против кортежа).Вы сказали, что хотите сохранить дополнительную информацию, такую ​​как смещение, со строкой в ​​любом случае (как в случаях parse и parse_sep_str), поэтому вы можете рассмотреть возможность реализации этого типа в модуле расширения C.Посмотрите на Cython и связанные с ним, если вы не хотите писать C напрямую.

Ожидается, что случаи # 1 и # 2 будут идентичными: при указании на родительскую строку я пытался сэкономить память, а не время обработки, но этот тест не измеряет это.

1 голос
/ 24 января 2013

У меня есть функция для обработки текстового файла и использования буфера при чтении и записи и параллельных вычислениях с асинхронным пулом пакетов процесса. У меня есть AMD с 2 ядрами, 8 ГБ ОЗУ, с GNU / Linux, и я могу обрабатывать 300000 строк менее чем за 1 секунду, 1000000 строк примерно за 4 секунды и примерно 4500000 строк (более 220 МБ) примерно за 20 секунд:

# -*- coding: utf-8 -*-
import sys
from multiprocessing import Pool

def process_file(f, fo="result.txt", fi=sys.argv[1]):
    fi = open(fi, "r", 4096)
    fo = open(fo, "w", 4096)
    b = []
    x = 0
    result = None
    pool = None
    for line in fi:
        b.append(line)
        x += 1
        if (x % 200000) == 0:
            if pool == None:
                pool = Pool(processes=20)
            if result == None:
                result = pool.map_async(f, b)
            else:
                presult = result.get()
                result = pool.map_async(f, b)
                for l in presult:
                    fo.write(l)
            b = []
    if not result == None:
        for l in result.get():
            fo.write(l)
    if not b == []:
        for l in b:
            fo.write(f(l))
    fo.close()
    fi.close()

Первый аргумент - это функция, которая получает одну строку, обрабатывает и возвращает результат для записи в файл, затем - файл вывода, а последний - файл ввода (последний аргумент нельзя использовать, если вы получили первый параметр в файле сценария. ввода).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...