Огромная разница в использовании памяти между dask и dask.distributed - PullRequest
2 голосов
/ 26 июня 2019

Я пытаюсь использовать dask.delayed для вычисления большой матрицы для использования в последующих вычислениях. Я только когда-либо запускаю код на одной локальной машине. Когда я использую dask планировщик для одной машины, он работает нормально, но немного медленно. Чтобы получить доступ к дополнительным параметрам и мониторам производительности для улучшения кода, я хотел бы использовать dask.distributed на одной машине. Однако выполнение того же кода с клиентом dask.distributed медленно израсходует всю доступную память и завершается сбоем, ничего не добившись.

Есть ли другой способ решения проблемы, который позволил бы клиенту dask.distributed завершить работу с большей эффективностью памяти?

  • Я прочитал dask.delayed Руководство по передовому опыту и думаю, что мы правильно его используем.
  • Я запустил его на локальном ПК Win 10 (64 ГБ ОЗУ) и виртуальной машине Azure Win Server 2012 (256 ГБ) с тем же результатом.
  • Я пробовал устанавливать чанки вручную.
  • Я пытался использовать stack.rechunk для оптимизации размеров чанков, включая автоматическое чанкинг по строкам и столбцам (чанки строк, кажется, работают намного быстрее в планировщике dask).
  • Я пытался использовать compute() и persist() (тот же результат).
  • Я попытался запустить клиент dask.distributed с планировщиком потоков и процессов и настроить количество рабочих. threads использует еще больше оперативной памяти перед смертью.
  • Я попытался настроить dask.distributed с ограничением памяти cluster = distributed.LocalCluster(memory_limit = 8e9) согласно этому ответу , но ограничение памяти игнорируется.
  • Если я уменьшу размер проблемы (nX и nY ниже), клиент dask.distributed выполнит задачу, однако ему все равно потребуется значительно больше времени и памяти, чем планировщику dask.

Этот пример воспроизводит проблему:

import dask
import distributed
import numpy as np
import dask.array as da

def calcRow(X,Y):
    Tx = np.transpose(X * (X + Y)) # Simplified work
    return (Tx)

# Specify size of (nY x nX) matrix
nX = 1000000 #  Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000

# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)

# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work 
client = distributed.Client()
client

# Build the matrix
row = dask.delayed(calcRow, pure=True)   # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
            for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries

На самом деле мои проблемы гораздо больше, и calcRow сам по себе большой, медленный и сложный расчет, но шаги формы и построения матрицы совпадают.

Я понимаю, что лучше всего scatter хранить данные в памяти перед вызовом compute, но у меня нет функции для разброса, просто массив delayed.

Если я закомментирую 2 dask.distributed клиентских строк, приведенный выше пример будет работать в течение 60 секунд, используя максимум 0,25 ГБ ОЗУ. Но с этими строками код поднимается до полного использования памяти (64 ГБ) за 3-4 минуты и продолжает работать, пока система не станет нестабильной.

Если я построю матрицу в dask, я смогу запустить клиент dask.distributed и использовать матрицу в последующих dask.distributed вычислениях без проблем. Это просто построение матрицы, которая вызывает проблемы.

Я почти чувствую, что это ошибка, но не уверен, что мой код не виноват. Я действительно ценю предложения, которые могут заставить код работать или доказать ошибку.

РЕДАКТИРОВАТЬ 1: Я также попытался применить декоратор к calcRow:

@dask.delayed
def calcRow(X,Y):

и использование:

makeRows = [calcRow(x, y[ii]) for ii in range(nY)]

а это, кажется, идентично?

РЕДАКТИРОВАТЬ 2: Если я запускаю distributed.client с processes=False, он быстрее использует всю системную память, но на самом деле выдает следующее предупреждение, которое может быть диагностическим:

distrib.worker - ПРЕДУПРЕЖДЕНИЕ - Использование памяти велико, но работник не имеет данных для сохранить на диск. Возможно, какой-то другой процесс вызывает утечку памяти? Процесс память: 40,27 ГБ - ограничение рабочей памяти: 8,00 ГБ

...