Ситуация:
У меня есть большой объем данных с веб-сайта социальной сети за 5 месяцев, разделенных минутами строк, отформатированных json файлов. См. Здесь для объяснения
Все данные находятся на сервере с почти 400 ГБ оперативной памяти и 32-ядерным процессором с 64 потоками. Я воспользовался этим с помощью модуля многопроцессорности python, однако мне в основном приходится выделить 29 процессов на синтаксический анализ, а остальные - на извлечение, запись и чтение / удаление. Производительность, кажется, не намного лучше, чем моя текущая конфигурация.
Я создал программу с 5 основными процессами, которые расширяют модуль python multiprocessing. Для каждого шага существует процесс:
extracting
: извлечение файлов tar, охватывающих 1 день из 1 минуты json .bz2 файлов.
reading
: чтение непосредственно из файлов bz2, send json dicts to parsing queue send message to delete module when finish with tar file.
deleting
: удаляет старые файлы tar, чтобы не использовать столько места на сервере
parsing
: анализирует json dicts для ключевых слов для категоризации, выравнивает их, помещает в фрейм данных и отправляет в очередь записи.
write
: объединяет фреймы данных из очереди записи в соответствии с категорией. когда основной фрейм данных имеет определенный размер, записать в выходной каталог и удалить.
Проблема:
Получение данных занимает слишком много времени. Если бы я запустил программу в том виде, в котором она есть сейчас, не останавливаясь, потребуется более 20 недель, чтобы проанализировать данные, которые мне действительно нужны, и разделить их на 4 широкие категории, которые я помещаю в фреймы данных и записываю в файлы hdf5.
Что я пробовал:
Я пытался получить json, поместив его в словарь построчно, например:
with open('file') as f:
for line in f:
json.loads(line)
# send data to queue for parse processes
Я также пытался преобразовать в другой тип данных, чтобы уменьшить накладные расходы на процесс синтаксического анализа:
with open('file') as f:
for line in f:
json.loads(line)
# categorize and convert to flattened dataframes and send
# to parse process
Я также пробовал другие вещи, например, использование pandas Series или Lists вместо python dicts, но это тоже не помогает.
текущий псевдокод для основного процесса синтаксического анализа:
while True:
while parse queue empty: wait 1 second
item = parse_queue.get
if parse_queue.get is None:
break
if item.get(keyword) is not None:
# flatten, convert to dataframe, send to write
# I do this for 4 different cases of items.
Я также пробовал отправлять pandas Series или Dataframes вместо python dicts. Кажется, это не помогает. Я пробовал сгладить диктатор перед рукой, это ничего не меняет.
доступ к python dict - это O (1). Процесс чтения, очевидно, достаточно быстр, чтобы создать и отправить python диктов в очередь на синтаксический анализ со скоростью 2-3 тыс. Диктов в секунду (после фильтрации нежелательного мусора json). Так почему же этот синтаксический анализ длится так долго? На синтаксический разбор уходит в 10–100 раз больше времени, чем на чтение файлов.
Что, я думаю, поможет:
Я считаю, что моя проблема в том, что может быть решена только путем синтаксического анализа непосредственно из файлового потока, но как я могу передать эти json объекты, если они находятся в таком формате файлов?
Мой главный вопрос:
Как я могу ускорить этот процесс синтаксического анализа? Нужно ли мне читать напрямую из файлового потока? Если да, то как я могу это сделать?
РЕШЕНИЕ ::
Когда вы отправляете много элементов через multiprocessing.Queue
, он должен десериализоваться при отправке, а затем потребитель должен повторно проанализировать элемент. Это значительно замедляет время обработки. Либо объедините множество мелких объектов в один большой объект, который вы можете отправить через очередь одновременно, либо подумайте об объединении двух процессов (например, чтения и анализа json), и это будет намного быстрее.
После Решив возникшую ранее проблему, объединив созданные мной классы read
и parse
, я увидел, что у меня также было огромное узкое место при записи. Я исправил это, объединив целые файлы, состоящие из проанализированных json объектов, в один список и отправив этот список процессу write
через очередь.
Ключевые выносы : очереди имеют много накладных расходов и требуют много времени на обработку до .get
, поэтому найдите способ минимизировать количество элементов, отправляемых через очередь, путем объединения процессов, которые излишне абстрагировать или объединять множество объектов в один большой.