У меня есть большой defaultdict(dict)
массив numpy.Я хочу сохранить его в общей памяти и записать в него, а также использовать его после его полного формирования.
Я использую joblib для этой цели.
from collections import defaultdict
import os, tempfile
from joblib import Parallel, delayed, load, dump
import numpy as np
def mean(data, i):
return np.mean([np.mean(data[i][j]) for j in data[i]])
class BigData(object):
def __init__(self, tmp_dir="/mnt/", workers = 10):
self.big_data = defaultdict(dict)
self.workers = workers
assert os.path.isdir(tmp_dir), "tmp_dir does not exists"
self.temp_folder = tempfile.mkdtemp(dir=tmp_dir)
filename = os.path.join(self.temp_folder, "big_data.json")
if os.path.exists(filename): os.unlink(filename)
dump(self.big_data, filename)
self.big_data = load(filename, mmap_mode="w+")
print("Memory map of big_data: {}".format(filename))
self.fill_big_data()
# dump(self.big_data, filename)
# self.big_data = load(filename, mmap_mode="r+")
self.use_big_data()
def fill_big_data(self):
for i in range(100):
for j in range(10):
self.big_data[i][j] = list(np.ones(int(1e6)))
def use_big_data(self):
results = Parallel(n_jobs = self.workers)(delayed(mean)(self.big_data,i) for i in range(10))
print(results)
bd = BigData(tmp_dir="/mnt/tmp_data")
Выше приведен игрушечный пример того, что я хочу сделать.Я не хочу менять self.big_data
на numpy array
.
Этот код вылетает при выполнении Parallel.Если я правильно понимаю, joblib
запустит несколько независимых процессов в серверной части с предоставленными аргументами.Аргументы каждого процесса будут скопированы, поэтому становится невозможным передавать self.big_data нескольким процессам.
Однако, когда я создал memmap
из self.big_data
, он, кажется, заполняет его в памяти, а не записывает в файл.Я понимаю, что это делает это, потому что иначе скорость будет проблемой.
Я хочу предположить, что, поскольку self.big_data
является memmap
, когда вызывается Parallel, процессы должны передать указатель на memmap
.Вместо этого он копирует весь словарь для каждого процесса.
Передача temp_folder также, похоже, не помогает.
Parallel(n_jobs = self.workers, temp_folder="/mnt/")(delayed(mean)(self.big_data,i) for i in range(10))
Может ли кто-нибудь пролить свет на то, когда memmap передается в качестве указателя вместо структуры данных?