Многопроцессорная обработка большого количества данных в python - PullRequest
0 голосов
/ 03 апреля 2020

Я пытаюсь использовать многопроцессорность как способ ускорить обработку моих данных. Мои данные состоят из 3000 json файлов, и мой код выглядит примерно так:

def analyse_page(file, arg1, arg2, arg3, arg4):
    with open(file) as f:
        data = json.load(f)
    for i in range(data):
        data[i] = treat_item(data[i], arg1, arg2, arg3, arg4)
    with open(output_json, 'w') as f:
        json.dump(f,data)

for file in files:
    analyse_page(file, arg1, arg2, arg3, arg4)

print('done!')

Таким образом, идея состоит в том, чтобы обработать элементы json, а затем вывести измененный json. Я вижу, что мой компьютер использует 15% мощности процессора для простого For oop, поэтому я решил использовать многопроцессорность, но у меня возникла проблема, которую я не могу понять. Я уже пробовал Process и Pool, как по частям, так и целиком, однако каждый раз он всегда может сделать треть файлов, а затем сценарий останавливается без ошибок!

Поэтому я снова запускаю код, используя if os.path.exists(): continue, чтобы обработанные файлы игнорировались. И даже так, он делает еще одну треть файлов и останавливается. Поэтому, когда я запускаю его еще раз, он выполняет еще одну треть, а затем печатает done!

Функция analyse_page занимает примерно 3 с на страницу, поэтому каков правильный способ запуска той же функции в многопроцессорной обработке? давно?

Обновление, что я уже сделал:

Обработка

processes = []
for file in files:
    p = multiprocessing.Process(target=analyse_page, args=(file, arg1, arg2, arg3, arg4,))
    processes.append(p)
    p.start()
for process in processes:
    process.join()

Процесс с партией

def chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

processes = []
numberOfThreads = 6 #Max is 8
For file in files:
    p = multiprocessing.Process(target=analyse_page, args=(file, arg1, arg2, arg3, arg4,))
    processes.append(p)

for i in chunks(processes,numberOfThreads):
    for j in i:
        j.start()
    for j in i:
        j.join()

Пул

pool = multiprocessing.Pool(6)
For file in files:
    pool.map(analyse_page, (file, arg1, arg2, arg3, arg4,))
pool.close() 

1 Ответ

0 голосов
/ 03 апреля 2020

Для простой обработки многопроцессорной обработки вы можете использовать модуль concurrent.futures .

Python Документация: Concurrent Futures

До Я объясняю каждый аспект, есть большое и простое видеоурок с примером кода (легко адаптировать):

YouTube: учебник по многопроцессорной обработке

Для обработки многих задач с несколькими процессорами или потоками я рекомендую модуль очереди.

Python Документация: очередь

from queue import Queue

#Create Queue object
q = Queue()

#Put item to queue
q.put("my value")

#Get and process each item in queue and remove it
while not q.empty():
    myValue = q.get()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...