У меня большой файл строки json (каждая строка является объектом json).Я хотел бы прочитать каждую строку и обрабатывать каждую строку параллельно.Вот что я получил:
import gzip
import multiprocessing as mp
import pandas as pd
from json import loads
def process_file(file_line):
json_line = loads(file_line)
data = json_line.get('data', None)
if data:
df = pd.DataFrame(data)
return df.groupby(['style']).score.describe()
return pd.DataFrame()
pool = mp.Pool(8)
jobs = []
with gzip.open('2018.jl.gz') as f:
for line in f:
jobs.append(pool.apply_async(process_file,(f)))
for job in jobs:
job.get()
pool.close()
Однако обработка занимает больше времени, чем чтение файла, и заканчивается добавлением слишком большого количества заданий и проблемами с памятью.
Есть ли способ, которым я мог бы распараллелить это??Как я мог закодировать это так, что каждый поток будет захватывать строку и обрабатывать ее, а когда она закончится, он получит новую строку, и цикл остановится, если все потоки заняты?