У меня большой паркетный файл (~ 1 ТБ на диске), который я хотел бы обработать с помощью Dask, и 512 ГБ оперативной памяти. Один из этапов обработки требует объединения с меньшим DataFrame. Я хотел бы присоединиться к DataFrames по индексам, так как это должно быть более эффективным. Вот почему я хотел бы установить индекс и сохранить большой файл в паркет.
Объединение больших и маленьких DataFrames здесь не проблема, но тот факт, что я не могу перераспределить большой df (как Рекомендуется .set_index docs ) после выполнения set_index () для несортированного столбца.
Вот что я сейчас делаю:
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
cluster=LocalCluster(n_workers=20,
threads_per_worker=4,
local_directory = "dask_local_dir",
memory_limit="25GB",
dashboard_address=":8787")
client = Client(cluster)
df = dd.read_parquet('big.pqt') # Read the large parquet file
Разделы размером около 600 МБ (Я знаю это из df.memory_usage_per_partition ()). В df есть 12 столбцов:
df.dtypes
выход:
id int64
a int64
b int64
c int64
d int64
e float64
f float64
g float64
h float64
i float64
j float64
k int64
dtype: object
Затем я устанавливаю индекс:
df = df.set_index('id', sorted=False)
Примерно треть / половина пути обработка (я отслеживаю процессы с помощью Dask Dashboard), я начинаю получать много предупреждений оболочки (буквально каждую секунду или около того):
distributed.utils_perf - WARNING - full garbage collections took 40% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 41% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 47% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 42% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 43% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 45% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 43% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 38% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 50% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 49% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 42% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 40% CPU time recently (threshold: 10%)
Установка индекса фактически завершается (и занимает ~ 1ч30м) , Через некоторое время он заметно замедляется, и рабочая память почти заполняется в течение всего процесса.
Затем я пытаюсь перераспределить результаты:
df = df.repartition (partition_size = "512MB")
Через некоторое время я снова получаю тонну сообщений, связанных со сборкой мусора. Панели памяти для рабочих желтые во время выполнения, я предполагаю, что это означает, что память, выделенная рабочему, заполнена на 50% -60%. Процесс почти останавливается. На полпути рабочим не хватает памяти:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
<timed exec> in <module>
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/dataframe/core.py in repartition(self, divisions, npartitions, partition_size, freq, force)
1167
1168 if partition_size is not None:
-> 1169 return repartition_size(self, partition_size)
1170 elif npartitions is not None:
1171 return repartition_npartitions(self, npartitions)
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/dataframe/core.py in repartition_size(df, size)
5691 size = int(size)
5692
-> 5693 mem_usages = df.map_partitions(total_mem_usage, deep=True).compute()
5694
5695 # 1. split each partition that is larger than partition_size
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
435 keys = [x.__dask_keys__() for x in collections]
436 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437 results = schedule(dsk, keys, **kwargs)
438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
439
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2593 should_rejoin = False
2594 try:
-> 2595 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2596 finally:
2597 for f in futures.values():
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1892 direct=direct,
1893 local_worker=local_worker,
-> 1894 asynchronous=asynchronous,
1895 )
1896
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
776 else:
777 return sync(
--> 778 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
779 )
780
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1751 exc = CancelledError(key)
1752 else:
-> 1753 raise exception.with_traceback(traceback)
1754 raise exc
1755 if errors == "skip":
KilledWorker: ("('shuffle-split-dadef103423e7bc8de82c41cf5d3e977', 1, 12, (4, 18, 16))", <Worker 'tcp://127.0.0.1:45707', name: 5, memory: 0, processing: 21253>)
Размер столбца, который я хотел бы использовать в качестве индекса, составляет ~ 715 ГБ (как сообщает df.memory_usage ()). Правильно ли я считаю, что индекс нельзя установить, так как он не помещается в доступную память или есть другая проблема?