Как разобрать большой файл, используя многопоточность в Python? - PullRequest
2 голосов
/ 19 марта 2012

У меня огромный файл, и мне нужно его прочитать и обработать.

with open(source_filename) as source, open(target_filename) as target:
    for line in source:
        target.write(do_something(line))

    do_something_else()

Можно ли это ускорить с помощью потоков? Если я создаю поток в строке, это будет иметь огромные накладные расходы?

edit: Чтобы вопрос не обсуждался, Как должен выглядеть код?

with open(source_filename) as source, open(target_filename) as target:
   ?

@ Nicoretti: на итерации мне нужно прочитать строку из нескольких килобайт данных.

обновление 2: файл может быть bz2, поэтому Python, возможно, придется ждать распаковки:

$ bzip2 -d country.osm.bz2 | ./my_script.py

Ответы [ 4 ]

7 голосов
/ 19 марта 2012

Вы можете использовать три потока: для чтения, обработки и записи. Возможное преимущество заключается в том, что обработка может происходить во время ожидания ввода-вывода, но вам нужно самостоятельно потратить некоторое время, чтобы увидеть, есть ли реальная выгода в вашей ситуации.

import threading
import Queue

QUEUE_SIZE = 1000
sentinel = object()

def read_file(name, queue):
    with open(name) as f:
        for line in f:
            queue.put(line)
    queue.put(sentinel)

def process(inqueue, outqueue):
    for line in iter(inqueue.get, sentinel):
        outqueue.put(do_something(line))
    outqueue.put(sentinel)

def write_file(name, queue):
    with open(name, "w") as f:
        for line in iter(queue.get, sentinel):
            f.write(line)

inq = Queue.Queue(maxsize=QUEUE_SIZE)
outq = Queue.Queue(maxsize=QUEUE_SIZE)

threading.Thread(target=read_file, args=(source_filename, inq)).start()
threading.Thread(target=process, args=(inq, outq)).start()
write_file(target_filename, outq)

Хорошей идеей будет установить maxsize для очередей, чтобы предотвратить постоянно растущее потребление памяти. Значение 1000 - это произвольный выбор с моей стороны.

3 голосов
/ 19 марта 2012

Это именно то, что вы не должны пытаться проанализировать априори , но вместо этого следует профилировать.

Имейте в виду, что многопоточность поможет только в том случае, если интенсивная обработка по каждой строке. Альтернативная стратегия заключается в том, чтобы поместить весь файл в память и обработать его в памяти, что вполне может устранить необходимость в многопоточности.

Если у вас есть поток на строку, это опять-таки что-то для тонкой настройки, но я предполагаю, что, если разбор строк не является довольно тяжелым, вы можете использовать фиксированное количество рабочих потоков.

Существует еще одна альтернатива: подпроцессы порождения, и пусть они выполняют чтение и обработку. Учитывая ваше описание проблемы, я ожидаю, что это даст вам наибольшее ускорение. Вы могли бы даже использовать какую-то систему кэширования в памяти для ускорения чтения, такую ​​как memcached (или любую из аналогичных систем, или даже реляционную базу данных).

3 голосов
/ 19 марта 2012

В CPython многопоточность ограничена глобальной блокировкой интерпретатора - фактически только один поток одновременно может выполнять код Python.Так что многопоточность приносит вам пользу только в том случае, если:

  1. вы выполняете обработку, которая не требует глобальной блокировки интерпретатора;или

  2. вы тратите время на блокировку ввода-вывода.

Примеры (1) включают применение фильтра к изображению в Библиотека изображений Python , или поиск собственных значений матрицы в numpy .Примеры (2) включают ожидание ввода данных пользователем или ожидание завершения отправки данных сетевым подключением.

Таким образом, может ли ваш код быть ускорен с помощью потоков в CPython, зависит от того, что именно вы делаете в do_something вызов.(Но если вы анализируете строку в Python, то очень маловероятно, что вы сможете ускорить это, запустив потоки.) Также следует учесть, что если вы начнете запускать потоки, тогда вы столкнетесь с проблемой синхронизации при записи результатов вцелевой файл.Нет никакой гарантии, что потоки будут завершены в том же порядке, в котором они были запущены, поэтому вам нужно позаботиться о том, чтобы выходные данные выходили в правильном порядке.

Вот реализация с максимальной резьбой, в которой есть потокидля чтения ввода, записи вывода и одного потока для обработки каждой строки.Только тестирование покажет вам, является ли это быстрее или медленнее, чем однопоточная версия (или версия Джанна только с тремя потоками).

from threading import Thread
from Queue import Queue

def process_file(f, source_filename, target_filename):
    """
    Apply the function `f` to each line of `source_filename` and write
    the results to `target_filename`. Each call to `f` is evaluated in
    a separate thread.
    """
    worker_queue = Queue()
    finished = object()

    def process(queue, line):
        "Process `line` and put the result on `queue`."
        queue.put(f(line))

    def read():
        """
        Read `source_filename`, create an output queue and a worker
        thread for every line, and put that worker's output queue onto
        `worker_queue`.
        """
        with open(source_filename) as source:
            for line in source:
                queue = Queue()
                Thread(target = process, args=(queue, line)).start()
                worker_queue.put(queue)
        worker_queue.put(finished)

    Thread(target = read).start()
    with open(target_filename, 'w') as target:
        for output in iter(worker_queue.get, finished):
            target.write(output.get())
3 голосов
/ 19 марта 2012

Стадия обработки занимает относительно много времени, т. Е. Интенсивна ли она?Если нет, то нет, вы не выиграете много, поточив или многопроцессорный его.Если ваша обработка стоит дорого, тогда да.Итак, вам нужно в профиле, чтобы знать наверняка.

Если вы тратите относительно больше времени на чтение файла, т. Е. Он больше, чем на его обработку, то вы не сможете выиграть в производительности, используя потоки, узким местом является просто ввод-вывод, который потоки не улучшают.*

...