Быстрый способ для нескольких процессов для чтения из центрального источника в Python? - PullRequest
2 голосов
/ 22 марта 2011

Я ищу быстрый способ для нескольких процессов (в multiprocessing.Pool ()) для чтения из центрального источника данных.В настоящее время у меня есть файл, который читается в очередь (с помощью multiprocessing.Manager (). Queue ()), затем запускается рабочий пул и его процессы считываются из этой очереди.Это работает нормально, но когда я работаю с файлами размером несколько ГБ, это становится проблемой, поскольку управляемая очередь в ~ 7 раз медленнее обычной очереди Python.

Я думаю, что это доспособ, которым менеджер находится в отдельном процессе, и он должен обмениваться данными через сокет, а не напрямую с памятью.

Вот код, который я использую (функция get_records просто читает bytestream для каждой записи изфайл и возвращает его)

from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))

Так, может быть, есть лучший способ справиться с этим?


Вот некоторые статистические данные о скорости чтения одного из моих файлов данных (~ 3 ГБ) в различные типы данных:

Чтение в обычный список Python.Скорость составляет 229,377 МБ / с

l = []
map(l.append, get_records(f))

Чтение в обычную очередь.Скорость составляет 74,035 МБ / с

import Queue
q = Queue.Queue()
map(q.put, get_records(f))

Чтение в очередь multiprocessing.queues.Скорость составляет 67,718 МБ / с

from multiprocessing import Queue
mq = Queue()
map(mq.put, get_records(f))

Наконец чтение в управляемую очередь.Скорость составляет 9,568 МБ / с

from multiprocessing import Manager
manager = Manager()
mgr_q = manager.Queue()
map(mgr_q.put, get_records(f))

Тарифы рассчитываются по rate = duration / filesize / 1024 / 1024

1 Ответ

3 голосов
/ 22 марта 2011

Если вы просто выполняете чтение по файлу, можно одновременно считывать несколько процессов. Вместо передачи данных в очередь, просто передайте смещение и счет. Чем в работниках занимаются:

f.seek(offset)
get_records(f, count)
...