Я пытаюсь использовать dask.delayed
для вычисления большой матрицы для использования в последующих вычислениях. Я только когда-либо запускаю код на одной локальной машине. Когда я использую dask
планировщик для одной машины, он работает нормально, но немного медленно. Чтобы получить доступ к дополнительным параметрам и мониторам производительности для улучшения кода, я хотел бы использовать dask.distributed
на одной машине. Однако выполнение того же кода с клиентом dask.distributed
медленно израсходует всю доступную память и завершается сбоем, ничего не добившись.
Есть ли другой способ решения проблемы, который позволил бы клиенту dask.distributed
завершить работу с большей эффективностью памяти?
- Я прочитал dask.delayed Руководство по передовому опыту и думаю, что мы правильно его используем.
- Я запустил его на локальном ПК Win 10 (64 ГБ ОЗУ) и виртуальной машине Azure Win Server 2012 (256 ГБ) с тем же результатом.
- Я пробовал устанавливать чанки вручную.
- Я пытался использовать
stack.rechunk
для оптимизации размеров чанков, включая автоматическое чанкинг по строкам и столбцам (чанки строк, кажется, работают намного быстрее в планировщике dask
).
- Я пытался использовать
compute()
и persist()
(тот же результат).
- Я попытался запустить клиент
dask.distributed
с планировщиком потоков и процессов и настроить количество рабочих. threads
использует еще больше оперативной памяти перед смертью.
- Я попытался настроить
dask.distributed
с ограничением памяти cluster = distributed.LocalCluster(memory_limit = 8e9)
согласно этому ответу , но ограничение памяти игнорируется.
- Если я уменьшу размер проблемы (
nX
и nY
ниже), клиент dask.distributed
выполнит задачу, однако ему все равно потребуется значительно больше времени и памяти, чем планировщику dask
.
Этот пример воспроизводит проблему:
import dask
import distributed
import numpy as np
import dask.array as da
def calcRow(X,Y):
Tx = np.transpose(X * (X + Y)) # Simplified work
return (Tx)
# Specify size of (nY x nX) matrix
nX = 1000000 # Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000
# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)
# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work
client = distributed.Client()
client
# Build the matrix
row = dask.delayed(calcRow, pure=True) # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries
На самом деле мои проблемы гораздо больше, и calcRow
сам по себе большой, медленный и сложный расчет, но шаги формы и построения матрицы совпадают.
Я понимаю, что лучше всего scatter
хранить данные в памяти перед вызовом compute
, но у меня нет функции для разброса, просто массив delayed
.
Если я закомментирую 2 dask.distributed
клиентских строк, приведенный выше пример будет работать в течение 60 секунд, используя максимум 0,25 ГБ ОЗУ. Но с этими строками код поднимается до полного использования памяти (64 ГБ) за 3-4 минуты и продолжает работать, пока система не станет нестабильной.
Если я построю матрицу в dask
, я смогу запустить клиент dask.distributed
и использовать матрицу в последующих dask.distributed
вычислениях без проблем. Это просто построение матрицы, которая вызывает проблемы.
Я почти чувствую, что это ошибка, но не уверен, что мой код не виноват. Я действительно ценю предложения, которые могут заставить код работать или доказать ошибку.
РЕДАКТИРОВАТЬ 1:
Я также попытался применить декоратор к calcRow
:
@dask.delayed
def calcRow(X,Y):
и использование:
makeRows = [calcRow(x, y[ii]) for ii in range(nY)]
а это, кажется, идентично?
РЕДАКТИРОВАТЬ 2:
Если я запускаю distributed.client
с processes=False
, он быстрее использует всю системную память, но на самом деле выдает следующее предупреждение, которое может быть диагностическим:
distrib.worker - ПРЕДУПРЕЖДЕНИЕ - Использование памяти велико, но работник не имеет данных для
сохранить на диск. Возможно, какой-то другой процесс вызывает утечку памяти? Процесс
память: 40,27 ГБ - ограничение рабочей памяти: 8,00 ГБ