Вычисление двух массивов Dask, замораживающихся в конце - PullRequest
0 голосов
/ 23 декабря 2018

Я сгенерировал два случайных массива сумок длиной 450 000 000, которые я хочу разделить друг на друга.Когда я их вычисляю, в конце вычисления всегда останавливаются.

У меня запущен 8-ядерный экземпляр на 32 ГБ для запуска кода.

Я попробовал приведенный ниже код и некоторые модификации, которыеЯ пытался не сохранять данные в х или у.

x = da.random.random(450000000, chunks=(10000,))
x = client.persist(x)
z1 = dd.from_array(x)

y = da.random.random(450000000, chunks=(10000,))
y = client.persist(y)
z2 = dd.from_array(y)

flux_ratio_sq = z1.div(z2)
flux_ratio_sq.compute() 

Фактические результаты, которые я получаю, состоят в том, что персистент хранит в памяти значения x и y (всего 8 ГБ памяти), которые ожидаются, а затем вычисление добавляет больше памяти.Некоторые ошибки, которые я получаю, приведены ниже.

Множество этих ошибок:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 
3.74s.  This is often caused by long-running GIL-holding functions 
or moving large chunks of data. This can cause timeouts and instability.

tornado.application - ERROR - Exception in callback <bound method 
BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado 
object at 0x7fb48562a4a8>>

raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

Я хочу, чтобы конечный результат был в серии dask, чтобы я мог объединить его с моими существующими данными.

1 Ответ

0 голосов
/ 24 декабря 2018

Я постараюсь расширить свой комментарий здесь.Кулак: если numpy работает лучше, чем pandas (DataFrame или Series), то лучше выполнить вычисления с использованием numpy, а затем добавить результат к DataFrame, если это необходимо.С Dask это точно так же.Во-вторых, следуя документации , вы должны продолжать работу только в том случае, если вам нужно вызывать один и тот же фрейм данных несколько раз.

Так что для вашей конкретной проблемы вы можете сделать:

import dask.array as da
N = int(4.5e7)

x = da.random.random(N, chunks=(10000,))
y = da.random.random(N, chunks=(10000,))
flux_ratio_sq = da.divide(x, y).compute()

Приложение: с dask.dataframe вы можете использовать to_parquet() вместо compute() и сохранить результаты в файл.В смущающе параллельных задачах, подобных этой, влияние на оперативную память меньше, чем при использовании compute().Будет интересно узнать, можно ли применить нечто подобное в случае dask.array

...