Joblib не пишет в общую память - PullRequest
0 голосов
/ 19 декабря 2018

У меня есть большой defaultdict(dict) массив numpy.Я хочу сохранить его в общей памяти и записать в него, а также использовать его после его полного формирования.

Я использую joblib для этой цели.


from collections import defaultdict
import os, tempfile
from joblib import Parallel, delayed, load, dump
import numpy as np

def mean(data, i):
    return np.mean([np.mean(data[i][j]) for j in data[i]])

class BigData(object):
    def __init__(self, tmp_dir="/mnt/", workers = 10):
        self.big_data = defaultdict(dict)
        self.workers = workers

        assert os.path.isdir(tmp_dir), "tmp_dir does not exists"
        self.temp_folder = tempfile.mkdtemp(dir=tmp_dir)
        filename = os.path.join(self.temp_folder, "big_data.json")
        if os.path.exists(filename): os.unlink(filename)
        dump(self.big_data, filename)
        self.big_data = load(filename, mmap_mode="w+")
        print("Memory map of big_data: {}".format(filename))

        self.fill_big_data()
        # dump(self.big_data, filename)
        # self.big_data = load(filename, mmap_mode="r+")
        self.use_big_data()

    def fill_big_data(self):
        for i in range(100):
          for j in range(10):
            self.big_data[i][j] = list(np.ones(int(1e6)))

    def use_big_data(self):
        results = Parallel(n_jobs = self.workers)(delayed(mean)(self.big_data,i) for i in  range(10))
        print(results)



bd = BigData(tmp_dir="/mnt/tmp_data")

Выше приведен игрушечный пример того, что я хочу сделать.Я не хочу менять self.big_data на numpy array.

Этот код вылетает при выполнении Parallel.Если я правильно понимаю, joblib запустит несколько независимых процессов в серверной части с предоставленными аргументами.Аргументы каждого процесса будут скопированы, поэтому становится невозможным передавать self.big_data нескольким процессам.

Однако, когда я создал memmap из self.big_data, он, кажется, заполняет его в памяти, а не записывает в файл.Я понимаю, что это делает это, потому что иначе скорость будет проблемой.

Я хочу предположить, что, поскольку self.big_data является memmap, когда вызывается Parallel, процессы должны передать указатель на memmap.Вместо этого он копирует весь словарь для каждого процесса.

Передача temp_folder также, похоже, не помогает.

Parallel(n_jobs = self.workers, temp_folder="/mnt/")(delayed(mean)(self.big_data,i) for i in  range(10))

Может ли кто-нибудь пролить свет на то, когда memmap передается в качестве указателя вместо структуры данных?

...