Dask Data Lake - это правильный подход? - PullRequest
0 голосов
/ 18 июня 2020

Итак, я использую Dask для хранения больших объемов данных. Мы получаем около 50 миллионов новых строк данных в день. Не много столбцов в ширину. В настоящее время я храню данные с помощью ddf.to_parquet (long_term_storage_directory). Когда я получаю новые данные, я добавляю их в каталог long_term_storage_directory. Все работает нормально, но медленно.

Используемый индекс - это время, когда я надеялся, что по мере добавления данных они будут просто добавлены в длинный список паркетных файлов в long_term_storage_directory. (long_term_storage_directory также индексируется по тому же полю времени) Я обеспокоен тем, что мой подход в некотором роде ошибочен. Может быть, мне нужно использовать Spark или что-то еще для хранения данных?

Примечание: ddf_new_data индексируется с тем же индексом, что и в ddf_long_term_storage_directory. Я надеялся, что, поскольку новые поступающие данные имеют тот же индекс, что и в настоящее время в long_term_storage_directory, который добавил данные в долгосрочное хранилище данных, будет быстрее.

ddf_long_term_storage_directory = dd.read_parquet(path=long_term_storage_directory, engine='pyarrow')
ddf_new_data = dd.read_parquet(path=directory_to_add_to_long_term_storage, engine='pyarrow')

ddf_new_data = ddf_new_data.set_index(index_name, sorted=False, drop=True)

ddf = dd.concat([ddf_long_term_storage_directory, ddf_new_data], axis=0)
ddf = ddf.repartition(partition_size='200MB') #??? Do I need to do this every time I add new data
ddf.to_parquet(long_term_storage_directory)

1 Ответ

2 голосов
/ 18 июня 2020

Самый простой ответ - не загружать старые data / concat / repartition. Это действительно будет медленнее по мере накопления большего количества данных. Вместо этого просто запишите входящие данные в новый файл с последовательным номером в том же каталоге.

...