Как эффективно и быстро анализировать большие объемы файлов строчного формата json - PullRequest
1 голос
/ 19 июня 2020

Ситуация:

У меня есть большой объем данных с веб-сайта социальной сети за 5 месяцев, разделенных минутами строк, отформатированных json файлов. См. Здесь для объяснения

Все данные находятся на сервере с почти 400 ГБ оперативной памяти и 32-ядерным процессором с 64 потоками. Я воспользовался этим с помощью модуля многопроцессорности python, однако мне в основном приходится выделить 29 процессов на синтаксический анализ, а остальные - на извлечение, запись и чтение / удаление. Производительность, кажется, не намного лучше, чем моя текущая конфигурация.

Я создал программу с 5 основными процессами, которые расширяют модуль python multiprocessing. Для каждого шага существует процесс:

extracting: извлечение файлов tar, охватывающих 1 день из 1 минуты json .bz2 файлов.

reading: чтение непосредственно из файлов bz2, send json dicts to parsing queue send message to delete module when finish with tar file.

deleting: удаляет старые файлы tar, чтобы не использовать столько места на сервере

parsing: анализирует json dicts для ключевых слов для категоризации, выравнивает их, помещает в фрейм данных и отправляет в очередь записи.

write: объединяет фреймы данных из очереди записи в соответствии с категорией. когда основной фрейм данных имеет определенный размер, записать в выходной каталог и удалить.

Проблема:

Получение данных занимает слишком много времени. Если бы я запустил программу в том виде, в котором она есть сейчас, не останавливаясь, потребуется более 20 недель, чтобы проанализировать данные, которые мне действительно нужны, и разделить их на 4 широкие категории, которые я помещаю в фреймы данных и записываю в файлы hdf5.

Что я пробовал:

Я пытался получить json, поместив его в словарь построчно, например:

with open('file') as f:
for line in f:
    json.loads(line)
    # send data to queue for parse processes

Я также пытался преобразовать в другой тип данных, чтобы уменьшить накладные расходы на процесс синтаксического анализа:

with open('file') as f:
for line in f:
    json.loads(line)
    # categorize and convert to flattened dataframes and send
    # to parse process

Я также пробовал другие вещи, например, использование pandas Series или Lists вместо python dicts, но это тоже не помогает.

текущий псевдокод для основного процесса синтаксического анализа:

while True:
    while parse queue empty: wait 1 second
    item = parse_queue.get
    if parse_queue.get is None:
        break
    if item.get(keyword) is not None:
       # flatten, convert to dataframe, send to write
      # I do this for 4 different cases of items.

Я также пробовал отправлять pandas Series или Dataframes вместо python dicts. Кажется, это не помогает. Я пробовал сгладить диктатор перед рукой, это ничего не меняет.

доступ к python dict - это O (1). Процесс чтения, очевидно, достаточно быстр, чтобы создать и отправить python диктов в очередь на синтаксический анализ со скоростью 2-3 тыс. Диктов в секунду (после фильтрации нежелательного мусора json). Так почему же этот синтаксический анализ длится так долго? На синтаксический разбор уходит в 10–100 раз больше времени, чем на чтение файлов.

Что, я думаю, поможет:

Я считаю, что моя проблема в том, что может быть решена только путем синтаксического анализа непосредственно из файлового потока, но как я могу передать эти json объекты, если они находятся в таком формате файлов?

Мой главный вопрос:

Как я могу ускорить этот процесс синтаксического анализа? Нужно ли мне читать напрямую из файлового потока? Если да, то как я могу это сделать?

РЕШЕНИЕ ::

Когда вы отправляете много элементов через multiprocessing.Queue, он должен десериализоваться при отправке, а затем потребитель должен повторно проанализировать элемент. Это значительно замедляет время обработки. Либо объедините множество мелких объектов в один большой объект, который вы можете отправить через очередь одновременно, либо подумайте об объединении двух процессов (например, чтения и анализа json), и это будет намного быстрее.

После Решив возникшую ранее проблему, объединив созданные мной классы read и parse, я увидел, что у меня также было огромное узкое место при записи. Я исправил это, объединив целые файлы, состоящие из проанализированных json объектов, в один список и отправив этот список процессу write через очередь.

Ключевые выносы : очереди имеют много накладных расходов и требуют много времени на обработку до .get, поэтому найдите способ минимизировать количество элементов, отправляемых через очередь, путем объединения процессов, которые излишне абстрагировать или объединять множество объектов в один большой.

1 Ответ

0 голосов
/ 19 июня 2020

Если вам просто нужен более быстрый json синтаксический анализ, вы можете посмотреть u json (https://pypi.org/project/ujson/)

вместо

import json
parsed = json.loads(line)

you ' d write

import ujson
parsed = ujson.loads(line)

Однако, если вы очень хорошо знаете файл json, вы можете попытаться извлечь информацию, которую хотите извлечь, с помощью специального регулярного выражения или специального синтаксического анализатора, но такие решения могут сломаться при определенных обстоятельства, за исключением того, что они очень хорошо написаны, поскольку они могут зависеть от строки json, чтобы иметь больше специфики, чем требуется стандартом json.

Помимо этого, я предлагаю вам профилировать свой код, чтобы узнать, что точно замедляет вас.

Возможно, стоит связать что-то вроде этого:

один процесс читает один файл, читает n строк (например, 50) делегирует синтаксический анализ (ujson.loads()) из этих 50 строк в рабочий пул.

Многопроцессорность имеет некоторые накладные расходы. если вы делегируете очень крошечные задачи / элементы (например, всего одну строку), то накладные расходы на многопроцессорную обработку могут быть больше, чем то, что вы получаете, распределяя рабочую нагрузку)

...