dask, как определить пользовательскую (временную складку) функцию, которая работает параллельно и возвращает фрейм данных с другой формой - PullRequest
0 голосов
/ 28 июня 2018

Я пытаюсь реализовать функцию временного сгиба, которая будет «отображаться» на различные разделы кадра данных dask, который, в свою очередь, изменяет форму рассматриваемого кадра данных (или, альтернативно, создает новый кадр данных с измененной формой). Вот как далеко я продвинулся. Результат 'res', возвращаемый при вычислении, представляет собой список из 3 отложенных объектов. Когда я пытаюсь вычислить каждый из них в цикле (последние две строки кода), это приводит к «TypeError: объект DataFrame не вызывается» После прохождения примеров для map_partitions , я также попытался изменить входной DF (inplace) в функции без возвращаемого значения, что вызывает аналогичную ошибку TypeError с NoneType. Что мне не хватает?

Также, глядя на визуализацию (прилагается) Я чувствую, что есть необходимость сократить индивидуально вычисленные (сложенные) разделы в один DF. Как мне это сделать?

#! /usr/bin/env python

# Start dask scheduler and workers
# dask-scheduler &
# dask-worker --nthreads 1 --nprocs 6 --memory-limit 3GB localhost:8786 --local-directory /dev/shm &

from dask.distributed import Client
from dask.delayed import delayed
import pandas as pd
import numpy as np
import dask.dataframe as dd
import math

foldbucketsecs=30
periodicitysecs=15
secsinday=24 * 60 * 60
chunksizesecs=60 # 1 minute
numts = 5
start = 1525132800 # 01/05
end = 1525132800 + (3 * 60) # 3 minute

c = Client('127.0.0.1:8786')

def fold(df, start, bucket):
    return df

def reduce_folds(df):
    return df

def load(epoch):
    idx = []
    for ts in range(0, chunksizesecs, periodicitysecs):
        idx.append(epoch + ts)
    d = np.random.rand(chunksizesecs/periodicitysecs, numts)
    ts = []
    for i in range(0, numts):
        tsname = "ts_%s" % (i)
        ts.append(tsname)
        gts.append(tsname)
    res = pd.DataFrame(index=idx, data=d, columns=ts, dtype=np.float64)
    res.index = pd.to_datetime(arg=res.index, unit='s')
    return res

gts = []
load(start)
cols = len(gts)

idx1 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+periodicitysecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx1[:0], data=[], columns=gts, dtype=np.float64)
dfs = [delayed(load)(fn) for fn in range(start, end, chunksizesecs)]

from_delayed = dd.from_delayed(dfs, meta, 'sorted')

nfolds = int(math.ceil((end - start)/foldbucketsecs))
cprime = nfolds * cols

gtsnew = []

for i in range(0, cprime):
    gtsnew.append("ts_%s,fold=%s" % (i%cols, i/cols))

idx2 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+foldbucketsecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx2[:0], data=[], columns=gtsnew, dtype=np.float64)
folded_df = from_delayed.map_partitions(delayed(fold)(from_delayed, start, foldbucketsecs), meta=meta)
result = c.submit(reduce_folds, folded_df)

c.gather(result).visualize(filename='/usr/share/nginx/html/svg/df4.svg')

res = c.gather(result).compute()

for f in res:
    f.compute()

1 Ответ

0 голосов
/ 05 июля 2018

Неважно! Это была моя ошибка, вместо того, чтобы обернуть свою функцию в задержку, я просто передал ее вызову map_partitions, и это сработало.

folded_df = from_delayed.map_partitions(fold, start, foldbucketsecs, nfolds, meta=meta)

...