Оптимизация Scatter - PullRequest
       7

Оптимизация Scatter

1 голос
/ 07 марта 2019

У меня большая проблема с данными.Конкретная проблема не супер важна, но я решил ее с помощью dask.Теперь у меня две проблемы.

from dask import distributed
import numpy as np

local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)

hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)

[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)

futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
           for i in range(Y.shape[0])]

results = dask_client.gather(futures)

Я могу разделить Y (что хорошо, потому что у меня недостаточно памяти, чтобы загрузить все сразу), но все мои работники нуждаютсяhat_matrix.Рассеяние hat_matrix и последующая отправка Y по строкам отлично работает.За исключением того, что hat_matrix и Y оба ... большие, это нормально.У меня достаточно памяти, чтобы справиться с этим.Но я не могу найти способ разрешить короткие всплески памяти (которые происходят во время десериализации), поэтому, если я установлю ограничение памяти, няня убьет всех моих работников.Тогда все мои новые рабочие.И так далее.Итак, у меня есть три вопроса:

Есть ли способ установить предел памяти, который разрешает всплески при поступлении и распаковке сериализованных данных?Если у меня есть 64 ГБ памяти для 20 процессов, я бы хотел установить ограничение памяти, скажем, 2,8 ГБ на процесс.Когда я разбрасываю 2 ГБ данных, на десериализацию приходится ~ 4 ГБ на процесс, и няня убивает все.

Есть ли способ пошатнуть рассеяние, чтобы минимизировать скачок переходной памяти?

Есть ли удобный способ разбрасывать данные по дискам, а не по TCP, или мне нужно это записать на заказ?(Как следствие: есть ли у всех моих работников удобный способ загрузить массив dask с отображением в память из сериализованного файла?)

1 Ответ

1 голос
/ 10 марта 2019

Есть ли способ установить предел памяти, который разрешает выбросы при поступлении и распаковке сериализованных данных?

Обычно десериализация запускает произвольный код, поэтому dask не может реально контролировать то, что происходит. На практике, хотя ваши матрицы на самом деле не такие большие, хотя по сравнению с типичной памятью, доступной на современном оборудовании, я удивляюсь, что вы столкнулись с проблемой. Dask достаточно осторожен с массивами NumPy. Я не ожидал, что он будет использовать гораздо больше памяти, чем размер массива.

Есть ли способ ошеломить рассеяние, чтобы минимизировать скачок времени в памяти?

Рассеяние в настоящее время проходит через широковещательное дерево. Ваш клиент отправляет нескольким работникам, затем они отправляют еще нескольким работникам и так далее. По умолчанию коэффициент ветвления здесь только два, поэтому я был бы удивлен, увидев здесь огромный взрыв.

Есть ли удобный способ разбрасывать данные по дискам, а не по TCP, или мне нужно это записать на заказ? (Как следствие: есть ли у всех моих работников удобный способ загрузить массив dask с отображением в память из сериализованного файла?)

Возможно, вы могли бы использовать какой-либо массив NumPy с отображением в памяти, а не массив NumPy в памяти?

...