Обработка больших файлов по частям: непоследовательный поиск с readline - PullRequest
7 голосов
/ 24 апреля 2019

Я пытаюсь прочитать и обработать большой файл кусками с помощью Python. Я слежу за этим блогом , который предлагает очень быстрый способ чтения и обработки больших фрагментов данных, распределенных по нескольким процессам. Я лишь немного обновил существующий код, то есть, используя stat(fin).st_size вместо os.path.getsize. В этом примере я также не реализовал многопроцессорность, поскольку проблема также проявляется в одном процессе. Это облегчает отладку.

Проблема с этим кодом заключается в том, что он возвращает неработающие предложения. Это имеет смысл: указатели не учитывают окончания строки, а просто возвращают некоторый заданный размер в байтах. На практике можно было бы предположить, что вы можете решить эту проблему, пропустив последний элемент в извлеченном пакете строк, так как это, скорее всего, будет прерывистой линией. К сожалению, это тоже не работает надежно.

from os import stat


def chunkify(pfin, buf_size=1024):
    file_end = stat(pfin).st_size
    with open(pfin, 'rb') as f:
        chunk_end = f.tell()

        while True:
            chunk_start = chunk_end
            f.seek(buf_size, 1)
            f.readline()
            chunk_end = f.tell()
            yield chunk_start, chunk_end - chunk_start

            if chunk_end > file_end:
                break


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        batch = f.read(chunk_size).splitlines()

    # changing this to batch[:-1] will result in 26 lines total
    return batch


if __name__ == '__main__':
    fin = r'data/tiny.txt'
    lines_n = 0
    for start, size in chunkify(fin):
        lines = process_batch(fin, start, size)
        # Uncomment to see broken lines
        # for line in lines:
        #    print(line)
        # print('\n')
        lines_n += len(lines)

    print(lines_n)
    # 29

Приведенный выше код выведет 29 как общее количество обработанных строк. Если вы не вернете последний элемент пакета, наивно полагая, что это в любом случае пунктирная линия, вы получите 26. Фактическое количество строк - 27. Данные тестирования приведены ниже.

She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1

Если вы распечатаете созданные строки, вы увидите, что действительно, встречаются прерывистые предложения. Я нахожу это странным. Разве f.readline() не должен гарантировать, что файл будет прочитан до следующей строки? В выводе ниже пустая строка разделяет две партии. Это означает, что вы не можете проверить строку со следующей строкой в ​​пакете и удалить ее, если она является подстрокой - ломаное предложение принадлежит другому пакету, чем полное предложение.

...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r


In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...

Есть ли способ избавиться от этих разбитых предложений, не удаляя слишком много?

Вы можете скачать тестовый файл большего размера (100 000 строк) здесь .


После многих копаний кажется, что на самом деле какой-то недоступный буфер ответственен за непоследовательное поведение поиска, как обсуждалось здесь и здесь . Я опробовал предлагаемое решение для использования iter(f.readline, '') с seek, но это все еще дает мне противоречивые результаты. Я обновил свой код, чтобы он возвращал указатель файла после каждой партии из 1500 строк, но в действительности возврат пакетов будет перекрываться.

from os import stat
from functools import partial


def chunkify(pfin, max_lines=1500):
    file_end = stat(pfin).st_size
    with open(pfin, 'r', encoding='utf-8') as f:
        chunk_end = f.tell()

        for idx, l in enumerate(iter(f.readline, '')):
            if idx % max_lines == 0:
                chunk_start = chunk_end
                chunk_end = f.tell()
                # yield start position, size, and is_last
                yield chunk_start, chunk_end - chunk_start

    chunk_start = chunk_end
    yield chunk_start, file_end


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size).splitlines()

    batch = list(filter(None, chunk))

    return batch


if __name__ == '__main__':
    fin = r'data/100000-ep+gutenberg+news+wiki.txt'

    process_func = partial(process_batch, fin)
    lines_n = 0

    prev_last = ''
    for start, size in chunkify(fin):
        lines = process_func(start, size)

        if not lines:
            continue

        # print first and last ten sentences of batch
        for line in lines[:10]:
            print(line)
        print('...')
        for line in lines[-10:]:
            print(line)
        print('\n')

        lines_n += len(lines)

    print(lines_n)

Пример перекрывающихся партий приведен ниже. Первые два с половиной предложения последней партии дублируются из последних предложений этой партии ранее. Я не знаю, как объяснить или решить это.

...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els


For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...

В дополнение к этому я также попытался удалить readline из исходного кода и отследить оставшийся неполный фрагмент. Неполный кусок затем передается следующему фрагменту и добавляется к его фронту. Проблема, с которой я сейчас сталкиваюсь, заключается в том, что, поскольку текст читается в байтовых чанках, может случиться так, что чанк заканчивается без полного завершения байтов символа. Это приведет к ошибкам декодирования.

from os import stat


def chunkify(pfin, buf_size=1024):
    file_end = stat(pfin).st_size
    with open(pfin, 'rb') as f:
        chunk_end = f.tell()

        while True:
            chunk_start = chunk_end
            f.seek(buf_size, 1)
            chunk_end = f.tell()
            is_last = chunk_end >= file_end
            # yield start position, size, and is_last
            yield chunk_start, chunk_end - chunk_start, is_last

            if is_last:
                break


def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size)

    # Add previous leftover to current chunk
    chunk = leftover + chunk
    batch = chunk.splitlines()
    batch = list(filter(None, batch))

    # If this chunk is not the last one,
    # pop the last item as that will be an incomplete sentence
    # We return this leftover to use in the next chunk
    if not is_last:
        leftover = batch.pop(-1)

    return batch, leftover


if __name__ == '__main__':
    fin = r'ep+gutenberg+news+wiki.txt'

    lines_n = 0
    left = ''
    for start, size, last in chunkify(fin):
        lines, left = process_batch(fin, start, size, last, left)

        if not lines:
            continue

        for line in lines:
            print(line)
        print('\n')

        numberlines = len(lines)
        lines_n += numberlines

    print(lines_n)

Запуск приведенного выше кода неизбежно приведет к UnicodeDecodeError.

Traceback (most recent call last):
  File "chunk_tester.py", line 46, in <module>
    lines, left = process_batch(fin, start, size, last, left)
  File "chunk_tester.py", line 24, in process_batch
    chunk = f.read(chunk_size)
  File "lib\codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte

Ответы [ 3 ]

2 голосов
/ 03 мая 2019

У вас есть интересная проблема здесь.У вас есть n процессы, которым назначается каждое местоположение фрагмента данных для обработки, но вы не можете предоставить точное местоположение фрагментов, потому что вы имеете дело со строками, а ваши местоположения в байтах,Даже если вы разбили файл на строки, чтобы получить точное местоположение фрагментов, у вас возникли некоторые проблемы.

Вот решение, которое является неоптимальным (я предполагаю, что вы не хотите обрабатывать строки последовательно: это кажется очевидным):

  • разрезать файл на фрагменты, как при первой попытке;
  • для каждого фрагмента, найти первую и последнюю строку перевода.Формат чанка: B\nM\nA, где B (до) и A (после) не содержат перевода строки, но M может содержать переводы строки;
  • обрабатывает строки в M и поместите B\nA в список с текущим индексом чанка;
  • наконец, обработайте все B\nA элементы.

Это неоптимально, поскольку после обработки каждого M, вам все еще нужно обработать все B\nA, и эта последняя работа должна ждать завершения других процессов.

Вот код:

def chunkify(file_end, buf_size=1024):
    """Yield chunks of `buf_size` bytes"""
    for chunk_start in range(0, file_end, buf_size):
        yield chunk_start, min(buf_size, file_end - chunk_start)

def process_batch(remainders, i, f, chunk_start, chunk_size):
    """Process a chunk"""
    f.seek(chunk_start)
    chunk = f.read(chunk_size)
    chunk, remainders[i] = normalize(chunk)
    # process chunk here if chunk is not None
    return chunk

def normalize(chunk):
    """Return `M, B\\nA`
    The chunk format is `B\\nM\\nA` where `B` (before) and `A` (after) do not contains any line feed,
    but `M` may contain line feeds"""
    i = chunk.find(b"\n")
    j = chunk.rfind(b"\n")
    if i == -1 or i == j:
        return None, chunk
    else:
        return chunk[i+1:j], chunk[:i]+chunk[j:]

Обратите внимание, что если чанк не имеетпосередине (M часть), затем мы возвращаем None как кусок, и все отправляется на remainders.

Некоторые тесты:

text = """She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1"""

import io, os

def get_line_count(chunk):
    return 0 if chunk is None else len(chunk.split(b"\n"))

def process(f, buf_size):
    f.seek(0, os.SEEK_END)
    file_end = f.tell()
    remainders = [b""]*(file_end//buf_size + 1)
    L = 0
    for i, (start, n) in enumerate(chunkify(file_end, buf_size)):
        chunk = process_batch(remainders, i, f, start, n)
        L += get_line_count(chunk)

    print("first pass: lines processed", L)
    print("remainders", remainders)
    last_chunk = b"".join(remainders)
    print("size of last chunk {} bytes, {} lines".format(len(last_chunk), get_line_count(last_chunk)))
    L += get_line_count(last_chunk)
    print("second pass: lines processed", L)

process(io.BytesIO(bytes(text, "utf-8")), 256)
process(io.BytesIO(bytes(text, "utf-8")), 512)

with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
    process(f, 4096)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
    process(f, 16384)

Вывод:

first pass: lines processed 18
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nWhat sort', b" of things do YOU remember best?'\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br", b'east cancer.\nBut, Frum explai', b'ns: "Glenn Beck takes it into his head that this guy is bad news."\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b'ted 2000 presidential election.\nThe auction hig', b"hlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nAlso there is Meghn', b'a River in the west of Brahmanbaria.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nS', b'ep 19: Eibar (h) WON 6-1']
size of last chunk 880 bytes, 9 lines
second pass: lines processed 27

first pass: lines processed 21
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br', b'east cancer.\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b"ted 2000 presidential election.\nThe auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nSep 19: Eibar (h) WON 6-1']
size of last chunk 698 bytes, 6 lines
second pass: lines processed 27

first pass: lines processed 96963
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nOne of the Ffarquhar ', ...,  b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 517905 bytes, 3037 lines
second pass: lines processed 100000

first pass: lines processed 99240
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nSoon Carroll was in push-up position walking her hands tow', b'ard the mirror at one side of the room while her feet were dragged along by the casual dinnerware.\nThe track "Getaway" was inspired by and allud', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 130259 bytes, 760 lines
second pass: lines processed 100000

В последнем примере показано, что вы можете обрабатывать 99 240 из 100 000 строк параллельно, но вам нужно обработать последние 760 строк (130 тыс.) После завершения всех процессов.

Примечание по параллелизму: каждый подпроцессвладеет фиксированной ячейкой списка remainders, следовательно, не должно быть повреждения памяти.Возможно, было бы лучше хранить каждый остаток в своем собственном объекте процесса (обертка вокруг реального подпроцесса) и объединять все оставшиеся после завершения процессов.

2 голосов
/ 27 апреля 2019

Вы были , поэтому близко!Относительно простое изменение вашего окончательного кода (чтение данных как bytes, а не str) заставляет все это (почти) работать.

Основная проблема заключалась в том, что чтение из двоичных файлов имеет значение байт , но чтение из текстовых файлов означает текст , и вы сделали свой первый подсчет в байтах, а второй - символов , что привело к вашим предположениям о том, какие данные уже были прочитаныошибаться.Речь идет о внутреннем, скрытом буфере.

Другие изменения:

  • Код должен разделяться на b'\n' вместо использования bytes.splitlines() и удалять только пустые строки после соответствующего кода обнаружения.
  • Если размер файла не изменится (в этом случае ваш существующий код будет поврежден в любом случае ), chunkify можно заменить более простым, более быстрый цикл, который функционально идентичен без необходимости держать файл открытым.

Это дает окончательный код:

from os import stat

def chunkify(pfin, buf_size=1024**2):
    file_end = stat(pfin).st_size

    i = -buf_size
    for i in range(0, file_end - buf_size, buf_size):
        yield i, buf_size, False

    leftover = file_end % buf_size
    if leftover == 0:  # if the last section is buf_size in size
        leftover = buf_size
    yield i + buf_size, leftover, True

def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
    with open(pfin, 'rb') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size)

    # Add previous leftover to current chunk
    chunk = leftover + chunk
    batch = chunk.split(b'\n')

    # If this chunk is not the last one,
    # pop the last item as that will be an incomplete sentence
    # We return this leftover to use in the next chunk
    if not is_last:
        leftover = batch.pop(-1)

    return [s.decode('utf-8') for s in filter(None, batch)], leftover


if __name__ == '__main__':
    fin = r'ep+gutenberg+news+wiki.txt'

    lines_n = 0
    left = b''
    for start, size, last in chunkify(fin):
        lines, left = process_batch(fin, start, size, last, left)

        if not lines:
            continue

        for line in lines:
            print(line)
        print('\n')

        numberlines = len(lines)
        lines_n += numberlines

    print(lines_n)
1 голос
/ 04 мая 2019

Когда файлы открываются в текстовом режиме (ваш второй пример кода), тогда read обрабатывает аргумент size как «количество символов» (не байтов), но seek и tell относятся к текущему позиция в файле для "пустого буфера", поэтому:

  • Вы можете рассчитать размер чанка (для использования read) из len(l)
  • использование file_end = stat(pfin).st_size для вычисления размера последнего чанка некорректно (поскольку для кодирования utf-8 число символов для нелатинских алфавитов может не совпадать с количеством используемых байтов)

  • f.tell() все еще не может использоваться для вычисления размера чанка, но дает правильный результат для chunk_start. Я думаю, что это как-то связано с буферизацией TextIOWrapper: tell дает информацию о состоянии буфера + декодера, а не о реальной позиции в текстовом потоке. Вы можете взглянуть на эталонную реализацию ( def _read_chunk , def tell ) и увидеть, что все это сложно, и никто не должен доверять дельтам, рассчитанным по разным tell / seek звонки ( "# Захватить весь декодированный текст (мы перемотаем все лишние биты позже)." дает еще один намек на причины "неправильных" позиций)

Правильно искать / указывать работу для «поиска», но его нельзя использовать для вычисления количества символов между tell -ами (и даже количество байтов будет неправильным). Для получения правильного двоичного небуферизованного режима byte deltas следует использовать (with open(path, 'rb', buffering=0) as f: ...), но в этом случае разработчик должен убедиться, что все чтения возвращают «полные символы» (в «utf-8» разные символы имеют разные байты). длина)

Но простое использование chunk_size + =len(l) решает все проблемы, поэтому вы можете продолжать открывать файлы в текстовом режиме! Следующая измененная версия вашего кода работает должным образом:

from functools import partial


def chunkify(pfin, max_lines=1500):
    with open(pfin, 'r', encoding='utf-8') as f:
        chunk_start = f.tell()
        chunk_size = 0
        done = True

        for idx, l in enumerate(iter(f.readline, '')):
            chunk_size += len(l)
            done = False
            if idx != 0 and idx % max_lines == 0:
                yield chunk_start, chunk_size
                done = True
                chunk_start = f.tell()
                chunk_size = 0

        if not done:
            yield chunk_start, chunk_size


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size).splitlines()

    batch = list(filter(None, chunk))

    return batch


if __name__ == '__main__':
    fin = r'data/100000-ep+gutenberg+news+wiki.txt'

    process_func = partial(process_batch, fin)
    lines_n = 0

    prev_last = ''
    for start, size in chunkify(fin):
        lines = process_func(start, size)

        if not lines:
            continue

        # print first and last ten sentences of batch
        for line in lines[:10]:
            print(line)
        print('...')
        for line in lines[-10:]:
            print(line)
        print('\n')

        lines_n += len(lines)

    print(lines_n)
...