Как объединить кадры данных с dask без нехватки памяти? - PullRequest
0 голосов
/ 08 февраля 2019

Слияние нескольких dask-данных приводит к сбою моего компьютера.

Привет,

Я пытаюсь объединить длинный список CSV-файлов с Dask.Каждый CSV-файл содержит список временных меток, когда переменная изменила свое значение, вместе со значением;Например, для переменной1 мы имеем:

timestamp; value
2016-01-01T00:00:00; 3
2016-01-03T00:00:00; 4

, а для переменной 2 имеем:

timestamp; value
2016-01-02T00:00:00; 8 
2016-01-04T00:00:00; 9

Временные метки в каждом csv могут отличаться (поскольку они связаны с моментом, когда переменная имеетизменилось значение).Как конечный результат, я хочу получить hdf-файл, в котором каждая переменная имеет значение в каждой возникающей временной метке и заполнена вперед.Следовательно, что-то вроде следующего:

timestamp; var1; var2, 
2016-01-01T00:00:00; 3 ; nan
2016-01-02T00:00:00; 3 ; 8
2016-01-03T00:00:00; 4 ; 8
2016-01-04T00:00:00; 4 ; 9

Ниже я приведу метакод, который я использую для выполнения этого анализа и слияния.

# import 
from pathlib import Path
from functools import partial
import import dask.dataframe as dd
import dask.bag as db
from dask import delayed
from dask.diagnostics import ProgressBar

# define how to parse the dates 
def parse_dates(df):
    return pd.to_datetime(df['timestamp'], format='%Y-%m-%dT%H:%M:%S', errors='coerce')

# parse csv files to dask dataframe
def parse_csv2filtered_ddf(fn_file, sourcedir): 
    fn = source_dir.joinpath(fn_tag)
    ddf = dd.read_csv(fn, sep=';', usecols=['timestamp', 'value'], 
                      blocksize=10000000, dtype={'value': 'object'})
    meta = ('timestamp', 'datetime64[ns]')
    ddf['timestamp'] = ddf.map_partitions(parse_dates, meta=meta)
    v = fn_file.split('.csv')[0]
    ddf = ddf.dropna() \
        .rename(columns={'value': v}) \
        .set_index('timestamp')
    return ddf

# define how to merge
def merge_ddf(x, y):
    ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
    return ddf

# set source directory 
source_dir = Path('/path_to_list_of_csv_files/')

# get list of files to parse
lcsv = os.listdir(source_dir)

# make partial function to fix sourcedir  
parse_csv2filtered_ddf_partial = partial(parse_csv2filtered_ddf, source_dir)

# make bag of dataframes
b = db.from_sequence(lcsv).map(parse_csv2filtered_ddf_partial)

# merge all dataframes and reduce to 1 dataframe 
df = b.fold(binop=merge_ddf)

# forward fill the NaNs and drop the remaining
#
# please note that I am choosing here npartitions equal to 48 as 
#  experiments with smaller sets of data allow me to estimate 
#  the output size of the df which should be around 48 GB, hence 
#  chosing 48 should lead to partition of 1 GB, I guess. 
df = delayed(df).repartition(npartitions=48). \
    fillna(method='ffill'). \
    dropna()

# write output to hdf file
df = df.to_hdf(output_fn, '/data')

# start computation
with ProgressBar():
    df.compute(scheduler='threads')

К сожалению, эти сценарии никогда не завершаются успешно.В частности, отслеживая использование памяти, я могу следить за тем, чтобы память полностью заполнялась, после чего происходит сбой компьютера или программы.

Я пытался использовать только один поток в сочетании с несколькими процессами;например,

import dask
dask.config.set(scheduler='single-threaded')

в сочетании с

with ProgressBar():
    df.compute(scheduler='processes', num_workers=3)

также безуспешно.

Любые указатели в правильном направлении приветствуются.

РЕДАКТИРОВАТЬ

Ниже я приведу более краткий скрипт, который должен позволять генерировать аналогичные данные для воспроизведения MemoryError.

import numpy as np
import pandas as pd 
from dask import delayed
from dask import dataframe as dd
from dask import array as da
from dask import bag as db
from dask.diagnostics import ProgressBar
from datetime import datetime
from datetime import timedelta
from functools import partial

def make_ddf(col, values, timestamps):
    n = int(col) % 2
    idx_timestamps = timestamps[n::2]
    df = pd.DataFrame.from_dict({str(col): values, 'timestamp': idx_time})
    ddf = dd.from_pandas(df, chunksize=100000000)
    ddf = ddf.dropna() \
        .set_index('timestamp')
    return ddf

def merge_ddf(x, y):
    ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
    return ddf

N_DF_TO_MERGE = 55  # number of dataframes to merge 
N_PARTITIONS_REPARTITION = 55  

values = np.random.randn(5000000, 1).flatten()   
timestamps = [datetime.now() + timedelta(seconds=i*1) for i in range(10000000)]  
columns = list(range(N_DF_TO_MERGE))

# fix values and times
make_ddf_partial = partial(make_ddf, values=values, timestamps=timestamps)

# make bag
b = db.from_sequence(columns).map(make_ddf_partial)

# merge all dataframes and reduce to one 
df = b.fold(binop=merge_ddf)

# forward fill the NaNs and drop the remaining
df = delayed(df).repartition(npartitions=N_PARTITIONS_REPARTITION). \
    fillna(method='ffill'). \
    dropna()

# write output to hdf file
df = df.to_hdf('magweg.hdf', '/data')

with ProgressBar():
    df.compute(scheduler='threads')

, что приводит к следующей ошибке:

Traceback (последний вызов был последним): файл "mcve.py", строка 63, в файле main () "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ click \ core.py ", строка 764, в вызов , возврат файла self.main (* args, ** kwargs)"C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ click \ core.py", строка 717, в основном файле rv = self.invoke (ctx) "C: \ Users \ tomasvanoyen \ Miniconda3\ envs \ stora \ lib \ site-packages \ click \ core.py ", строка 956, in invoke return ctx.invoke (self.callback, ** ctx.params) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs\ stora \ lib \ site-packages \ click \ core.py ", строка 555, in invoke return callback (* args, ** kwargs) Файл" mcve.py ", строка 59, в основном df.compute (scheduler = 'нитей ') Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ base.py", строка 156, в compute (result,) = compute (self, traverse = False, ** kwargs) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ base.py ", строка 398, в результатах вычислений = расписание (dsk, keys, ** kwargs) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ threadaded.py", строка 76, в файле get pack_exception = pack_exception, ** kwargs) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ local.py ", строка 459, в get_async Повышение_экспонированияexc, tb) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ compatibility.py", строка 112, в ререйзе, поднимите файл exc "C: \ Users \ tomasvanoyen \ Miniconda3 \envs \ stora \ lib \ site-packages \ dask \ local.py ", строка 230, в execute_task result = _execute_task (задача, данные) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages "\ dask \ core.py ", строка 119, в _execute_task return func (* args2) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ utils.py ", строка 697,в вызов return getattr (obj, self.method) (* args, ** kwargs) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ dataframe \ядро.py ", строка 1154, в to_hdf возврат к to_hdf (self, path_or_buf, ключ, режим, добавление, ** kwargs) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ dataframe \io \ hdf.py ", строка 227, в to_hdf scheduler = scheduler, ** dask_kwargs) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ base.py ", строка 166, в compute_as_if_collection возврат графика (dsk2, keys, ** kwargs)Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ threadaded.py", строка 76, в get pack_exception = pack_exception, ** kwargs) Файл "C: \ Users \ tomasvanoyen \Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ local.py ", строка 459, в get_async повышение_экскрипции (не в т.ч.) Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages "\ dask \ compatibility.py ", строка 112, при ререйзе exc exc Файл" C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ local.py ", строка 230, в execute_task result =_execute_task (задача, данные) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ core.py", строка 119, в _execute_task возвращает func (* args2) Файл "C: \Пользователи \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ dask \ dataframe \ method.py ", строка 103, в border_slice result = getattr (df, kind) [start: stop] Файл" C: \ Users \ "tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ indexing.py ", строка 1500, в getitem вернуть self._getitem_axis (Maybe_cal)lable, axis = axis) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ indexing.py", строка 1867, в _getitem_axis возвращает self._get_slice_axis (ключ, axis =ось) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ indexing.py", строка 1536, в _get_slice_axis возвращает self._slice (indexer, axis = axis, kind ='iloc') Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ indexing.py", строка 151, в _slice возвращает self.obj._slice (obj, axis =axis, kind = kind) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ generic.py", строка 3152, в _slice result = self._constructor (self._data.get_slice (slobj, axis = axis)) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ internals \ Manager.py", строка 700, в get_slice bm._consolidate_inplace() Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ internals \ Manager.py", строка 929, в _consolidate_inplace self.blocks = tuple (_consolidate (self.blocks)) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ internals \ Manager.py", строка 1899, в _consolidate _can_consolidate= _can_consolidate) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ pandas \ core \ internals \ blocks.py", строка 3146, в _merge_blocks new_values ​​= np.vstack ([b.valuesдля b в блоках]) Файл "C: \ Users \ tomasvanoyen \ Miniconda3 \ envs \ stora \ lib \ site-packages \ numpy \ core \ shape_base.py", строка 234, в vstack возвращает _nx.concatenate ([atleast_2d (_m) для _m in tup], 0) MemoryError

1 Ответ

0 голосов
/ 20 февраля 2019

Две вещи кажутся странными.

  1. Вы вызываете код dask dataframe из кода dask.bag.
  2. Вы, кажется, вызываете слияние, когда, возможно, вы просто хотите concat?
...