Вот упрощенная версия моего кода.
import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
csv_path_two = ""
# the columns are a mix of ints floats datetimes and strings
# almost all string lengths are less than 15 two of the longest string columns have a max length of 70
left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")
cand_keys = [""] # I have 3
merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)
missing_mask = merged._merge != 'both'
missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]
print(f"Running {client}")
missing_findings.to_csv("results/findings-*.csv")
cluster.close()
client.close()
if __name__ == '__main__':
main()
Этот пример никогда не заканчивается, dask доходит до определенной части, после чего один или несколько рабочих мгновенно превышают предел памяти, и няня убивает их и откатывает весь прогресс рабочего
Looking на странице диагностики всплески памяти обычно случаются примерно на полпути между задачами произвольного разделения.
Я использую Dask 2.9.1 на Windows. Я могу обновить Dask, но это больно с моей текущей настройкой, и я не знаю, решит ли это мою проблему