Как написать параллельно файл с joblib? Проблема JoinableQueue - PullRequest
2 голосов
/ 04 апреля 2020

Я пытаюсь записать в один файл результаты вычислений, которые выполняются более 100k + файлов. Обработка файла занимает ~ 1 с и записывает одну строку в выходной файл. Сама проблема "смущающе параллельна", я только борюсь с правильной записью в файл (скажем, CSV). Вот что долго работало для меня go (Python 3.4?):

import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def save_to_file(q):
    with open('test.csv', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')
        q.task_done()

def process(x):
    q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
    q = JoinableQueue()
    p = Process(target=save_to_file, args=(q,))
    p.start()
    Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
    q.put(None) 
    p.join() 

Сегодня (на Python 3.6+) выдается следующее исключение:

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""

Как правильно записать в один файл с joblib?

1 Ответ

0 голосов
/ 04 апреля 2020

Оказывается, один из способов решения этой задачи - через multiprocessing.Manager, например:

import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed

def save_to_file(q):
    with open('test.csv', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')

def process(x):
    q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
    m = Manager()
    q = m.Queue()
    p = Process(target=save_to_file, args=(q,))
    p.start()
    Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
    q.put(None)
    p.join()

Мы разрешаем Manager управлять контекстом, остальное остается тем же (если не использовать обычный Queue вместо JoinableQueue).

Если кто-нибудь знает более приятный способ, я буду рад принять его в качестве ответа.

...