Ошибка слияния фреймов данных dask с большими файлами csv - PullRequest
0 голосов
/ 06 мая 2020

Вот упрощенная версия моего кода.

import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster


def main():
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)

    csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
    csv_path_two = "" 

    # the columns are a mix of ints floats datetimes and strings 
    # almost all string lengths are less than 15 two of the longest string columns have a max length of 70

    left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
    right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")

    cand_keys = [""] # I have 3
    merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)

    missing_mask = merged._merge != 'both'
    missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]

    print(f"Running {client}")
    missing_findings.to_csv("results/findings-*.csv")

    cluster.close()
    client.close()

if __name__ == '__main__':
    main()

Этот пример никогда не заканчивается, dask доходит до определенной части, после чего один или несколько рабочих мгновенно превышают предел памяти, и няня убивает их и откатывает весь прогресс рабочего

Looking на странице диагностики всплески памяти обычно случаются примерно на полпути между задачами произвольного разделения.

Я использую Dask 2.9.1 на Windows. Я могу обновить Dask, но это больно с моей текущей настройкой, и я не знаю, решит ли это мою проблему

1 Ответ

1 голос
/ 08 мая 2020

В обновлении до 2.15 эта проблема исправлена.

...