Установка индекса Dask DataFrame для столбца с размером больше доступной памяти - PullRequest
0 голосов
/ 26 апреля 2020

У меня большой паркетный файл (~ 1 ТБ на диске), который я хотел бы обработать с помощью Dask, и 512 ГБ оперативной памяти. Один из этапов обработки требует объединения с меньшим DataFrame. Я хотел бы присоединиться к DataFrames по индексам, так как это должно быть более эффективным. Вот почему я хотел бы установить индекс и сохранить большой файл в паркет.

Объединение больших и маленьких DataFrames здесь не проблема, но тот факт, что я не могу перераспределить большой df (как Рекомендуется .set_index docs ) после выполнения set_index () для несортированного столбца.

Вот что я сейчас делаю:

from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np

cluster=LocalCluster(n_workers=20,
        threads_per_worker=4,
        local_directory = "dask_local_dir",
        memory_limit="25GB",
        dashboard_address=":8787")

client = Client(cluster)

df = dd.read_parquet('big.pqt')  # Read the large parquet file

Разделы размером около 600 МБ (Я знаю это из df.memory_usage_per_partition ()). В df есть 12 столбцов:

df.dtypes

выход:

id      int64
a       int64
b       int64
c       int64
d       int64
e       float64
f       float64
g       float64
h       float64
i       float64
j       float64
k       int64
dtype: object

Затем я устанавливаю индекс:

df = df.set_index('id', sorted=False)

Примерно треть / половина пути обработка (я отслеживаю процессы с помощью Dask Dashboard), я начинаю получать много предупреждений оболочки (буквально каждую секунду или около того):

distributed.utils_perf - WARNING - full garbage collections took 40% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 41% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 47% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 42% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 43% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 45% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 43% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 38% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 50% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 49% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 42% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 40% CPU time recently (threshold: 10%)

Установка индекса фактически завершается (и занимает ~ 1ч30м) , Через некоторое время он заметно замедляется, и рабочая память почти заполняется в течение всего процесса.

Затем я пытаюсь перераспределить результаты:

df = df.repartition (partition_size = "512MB")

Через некоторое время я снова получаю тонну сообщений, связанных со сборкой мусора. Панели памяти для рабочих желтые во время выполнения, я предполагаю, что это означает, что память, выделенная рабочему, заполнена на 50% -60%. Процесс почти останавливается. На полпути рабочим не хватает памяти:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/dataframe/core.py in repartition(self, divisions, npartitions, partition_size, freq, force)
   1167 
   1168         if partition_size is not None:
-> 1169             return repartition_size(self, partition_size)
   1170         elif npartitions is not None:
   1171             return repartition_npartitions(self, npartitions)

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/dataframe/core.py in repartition_size(df, size)
   5691     size = int(size)
   5692 
-> 5693     mem_usages = df.map_partitions(total_mem_usage, deep=True).compute()
   5694 
   5695     # 1. split each partition that is larger than partition_size

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1892                 direct=direct,
   1893                 local_worker=local_worker,
-> 1894                 asynchronous=asynchronous,
   1895             )
   1896 

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    776         else:
    777             return sync(
--> 778                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    779             )
    780 

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/miniconda3/envs/mybrew/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1751                             exc = CancelledError(key)
   1752                         else:
-> 1753                             raise exception.with_traceback(traceback)
   1754                         raise exc
   1755                     if errors == "skip":

KilledWorker: ("('shuffle-split-dadef103423e7bc8de82c41cf5d3e977', 1, 12, (4, 18, 16))", <Worker 'tcp://127.0.0.1:45707', name: 5, memory: 0, processing: 21253>)

Размер столбца, который я хотел бы использовать в качестве индекса, составляет ~ 715 ГБ (как сообщает df.memory_usage ()). Правильно ли я считаю, что индекс нельзя установить, так как он не помещается в доступную память или есть другая проблема?

...