Я пытаюсь прочитать смесь файлов (npy, csv и т. Д.) Неизвестного размера, используя Dask. Файлы будут преобразованы в массивы и объединены в один перед выполнением некоторых операций, связанных с нарезкой.
Однако я отмечаю значительные различия в скорости в зависимости от того, как создаются массивы. Рассмотрим два следующих метода создания массива из CSV:
- с использованием numpy.readtxt и dask.array.from_array,
- с использованием dask.dataframe.read_csv и dask.dataframe. to_dask_array.
Теперь простая итерация по первому массиву почти в 1000 раз быстрее, чем итерация по второму. Я собираюсь предположить, что это потому, что массив был создан с использованием объекта уже в памяти.
Насколько я понимаю, первый массив состоит из одного фрагмента, и поэтому его итерация выполняется относительно быстро. Однако, даже если я перенесу второй массив в соответствие с первым, скорость итерации существенно не увеличится. Я также заметил, что свойство array.nbytes показывает одинаковое число для обоих массивов, что мне подсказываетчто они оба полностью присутствуют в памяти.
Я ожидал, что, как только я начну перебирать массив, Dask будет читать соответствующие фрагменты, необходимые в память. И поскольку в памяти имеется только один фрагмент (для данного конкретного случая), я ожидаю, что скорости будут сравнимыми, не учитывая затраты на чтение фрагмента в память за один раз. Пожалуйста, помогите мне понять, какую ошибку я делаю в своих рассуждениях здесь.
Ниже приведен минимальный пример, демонстрирующий это поведение [python 3.6.2, numpy 1.17.4, dask 2.9.0]:
import time
import numpy as np
import dask.array as da
import dask.dataframe as dd
def make_files():
np.random.random(0)
mat = np.random.random((6000, 784))
np.savetxt('data.csv', mat, delimiter=',', header=','.join(str(x) for x in range(784)))
def from_csv_via_np():
mat = np.loadtxt('data.csv', delimiter=',', skiprows=1)
arr = da.from_array(mat)
return arr
def from_csv_via_df():
df = dd.read_csv('data.csv')
arr = df.to_dask_array(lengths=True)
arr = da.rechunk(arr, (6000, 784))
return arr
def benchmark(fn):
arr = fn()
iter_start = time.perf_counter()
n_iters = 10
for i in range(n_iters):
x = arr[i].compute()
iter_elapsed = (time.perf_counter() - iter_start)/n_iters
print(f"func: {fn.__name__}")
print(f" array: {repr(arr)}")
print(f" read: {read_elapsed} seconds")
print(f" iter: {iter_elapsed} seconds")
print(f" size: {arr.nbytes} bytes")
if __name__ == "__main__":
make_files()
benchmark(from_csv_via_np)
benchmark(from_csv_via_df)