У меня большая проблема с данными.Конкретная проблема не супер важна, но я решил ее с помощью dask.Теперь у меня две проблемы.
from dask import distributed
import numpy as np
local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)
hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)
[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)
futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
for i in range(Y.shape[0])]
results = dask_client.gather(futures)
Я могу разделить Y
(что хорошо, потому что у меня недостаточно памяти, чтобы загрузить все сразу), но все мои работники нуждаютсяhat_matrix
.Рассеяние hat_matrix
и последующая отправка Y
по строкам отлично работает.За исключением того, что hat_matrix
и Y
оба ... большие, это нормально.У меня достаточно памяти, чтобы справиться с этим.Но я не могу найти способ разрешить короткие всплески памяти (которые происходят во время десериализации), поэтому, если я установлю ограничение памяти, няня убьет всех моих работников.Тогда все мои новые рабочие.И так далее.Итак, у меня есть три вопроса:
Есть ли способ установить предел памяти, который разрешает всплески при поступлении и распаковке сериализованных данных?Если у меня есть 64 ГБ памяти для 20 процессов, я бы хотел установить ограничение памяти, скажем, 2,8 ГБ на процесс.Когда я разбрасываю 2 ГБ данных, на десериализацию приходится ~ 4 ГБ на процесс, и няня убивает все.
Есть ли способ пошатнуть рассеяние, чтобы минимизировать скачок переходной памяти?
Есть ли удобный способ разбрасывать данные по дискам, а не по TCP, или мне нужно это записать на заказ?(Как следствие: есть ли у всех моих работников удобный способ загрузить массив dask с отображением в память из сериализованного файла?)