Перебор по-видимому идентичных массивов dask занимает разное время - PullRequest
0 голосов
/ 11 декабря 2019

Я пытаюсь прочитать смесь файлов (npy, csv и т. Д.) Неизвестного размера, используя Dask. Файлы будут преобразованы в массивы и объединены в один перед выполнением некоторых операций, связанных с нарезкой.

Однако я отмечаю значительные различия в скорости в зависимости от того, как создаются массивы. Рассмотрим два следующих метода создания массива из CSV:

  • с использованием numpy.readtxt и dask.array.from_array,
  • с использованием dask.dataframe.read_csv и dask.dataframe. to_dask_array.

Теперь простая итерация по первому массиву почти в 1000 раз быстрее, чем итерация по второму. Я собираюсь предположить, что это потому, что массив был создан с использованием объекта уже в памяти.

Насколько я понимаю, первый массив состоит из одного фрагмента, и поэтому его итерация выполняется относительно быстро. Однако, даже если я перенесу второй массив в соответствие с первым, скорость итерации существенно не увеличится. Я также заметил, что свойство array.nbytes показывает одинаковое число для обоих массивов, что мне подсказываетчто они оба полностью присутствуют в памяти.

Я ожидал, что, как только я начну перебирать массив, Dask будет читать соответствующие фрагменты, необходимые в память. И поскольку в памяти имеется только один фрагмент (для данного конкретного случая), я ожидаю, что скорости будут сравнимыми, не учитывая затраты на чтение фрагмента в память за один раз. Пожалуйста, помогите мне понять, какую ошибку я делаю в своих рассуждениях здесь.

Ниже приведен минимальный пример, демонстрирующий это поведение [python 3.6.2, numpy 1.17.4, dask 2.9.0]:

import time                                                                                          
import numpy as np                                                                                   
import dask.array as da                                                                              
import dask.dataframe as dd                                                                          

def make_files():                                                                                    
    np.random.random(0)                                                                              
    mat = np.random.random((6000, 784))                                                                
    np.savetxt('data.csv', mat, delimiter=',', header=','.join(str(x) for x in range(784)))          

def from_csv_via_np():                                                                               
    mat = np.loadtxt('data.csv', delimiter=',', skiprows=1)                                          
    arr = da.from_array(mat)                                                                         
    return arr                                                                                       

def from_csv_via_df():                                                                               
    df = dd.read_csv('data.csv')                                                                     
    arr = df.to_dask_array(lengths=True)                                                             
    arr = da.rechunk(arr, (6000, 784))                                                               
    return arr                                                                                       

def benchmark(fn):                                                                                   
    arr = fn()                                                                                       

    iter_start = time.perf_counter()                                                                 
    n_iters = 10                                                                                     
    for i in range(n_iters):                                                                         
        x = arr[i].compute()                                                                         

    iter_elapsed = (time.perf_counter() - iter_start)/n_iters                                        

    print(f"func: {fn.__name__}")                                                                    
    print(f"    array: {repr(arr)}")                                                                 
    print(f"    read: {read_elapsed} seconds")                                                       
    print(f"    iter: {iter_elapsed} seconds")                                                       
    print(f"    size: {arr.nbytes} bytes")                                                           

if __name__ == "__main__":                                                                           
    make_files()                                                                                     
    benchmark(from_csv_via_np)                                                                       
    benchmark(from_csv_via_df)                                                                       

1 Ответ

0 голосов
/ 13 декабря 2019

Полагаю, что сам разобрался с ответом после более тщательного прочтения документации Dask innerals .

Массивы не идентичны внутренне, иЯ понял, посмотрев на базовые графы задач . Их можно построить с помощью следующей команды:

arr.visualize('image.pdf')

Вот график массива с резервной копией , а - график массива с поддержкой данных ,оба получают доступ к первому элементу массивов.

Как видите, первый довольно прост и он происходит из одного фрагмента, который оказывается исходным массивом numpy, хранящимся в памяти. Это также можно проверить, напечатав словарь графа, доступный по адресу:

arr.__dask_graph__().layers

. Напротив, второй граф является относительно сложным и возникает в двух разных задачах read_csv. А поскольку Dask по умолчанию не сохраняет промежуточные результаты , можно с уверенностью предположить, что эти вычисления происходят для каждого вызова .compute (). В некоторых случаях кэширование может помочь решить подобные проблемы.

Таким образом, краткий ответ: массив, исходящий из кадра данных, требует повторных вызовов ввода-вывода, что делает его намного медленнее, чем простотот, который читает непосредственно из оперативной памяти.

...