Я пытаюсь реализовать функцию временного сгиба, которая будет «отображаться» на различные разделы кадра данных dask, который, в свою очередь, изменяет форму рассматриваемого кадра данных (или, альтернативно, создает новый кадр данных с измененной формой). Вот как далеко я продвинулся. Результат 'res', возвращаемый при вычислении, представляет собой список из 3 отложенных объектов. Когда я пытаюсь вычислить каждый из них в цикле (последние две строки кода), это приводит к «TypeError: объект DataFrame не вызывается» После прохождения примеров для map_partitions , я также попытался изменить входной DF (inplace) в функции без возвращаемого значения, что вызывает аналогичную ошибку TypeError с NoneType. Что мне не хватает?
Также, глядя на визуализацию (прилагается) Я чувствую, что есть необходимость сократить индивидуально вычисленные (сложенные) разделы в один DF. Как мне это сделать?
#! /usr/bin/env python
# Start dask scheduler and workers
# dask-scheduler &
# dask-worker --nthreads 1 --nprocs 6 --memory-limit 3GB localhost:8786 --local-directory /dev/shm &
from dask.distributed import Client
from dask.delayed import delayed
import pandas as pd
import numpy as np
import dask.dataframe as dd
import math
foldbucketsecs=30
periodicitysecs=15
secsinday=24 * 60 * 60
chunksizesecs=60 # 1 minute
numts = 5
start = 1525132800 # 01/05
end = 1525132800 + (3 * 60) # 3 minute
c = Client('127.0.0.1:8786')
def fold(df, start, bucket):
return df
def reduce_folds(df):
return df
def load(epoch):
idx = []
for ts in range(0, chunksizesecs, periodicitysecs):
idx.append(epoch + ts)
d = np.random.rand(chunksizesecs/periodicitysecs, numts)
ts = []
for i in range(0, numts):
tsname = "ts_%s" % (i)
ts.append(tsname)
gts.append(tsname)
res = pd.DataFrame(index=idx, data=d, columns=ts, dtype=np.float64)
res.index = pd.to_datetime(arg=res.index, unit='s')
return res
gts = []
load(start)
cols = len(gts)
idx1 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+periodicitysecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx1[:0], data=[], columns=gts, dtype=np.float64)
dfs = [delayed(load)(fn) for fn in range(start, end, chunksizesecs)]
from_delayed = dd.from_delayed(dfs, meta, 'sorted')
nfolds = int(math.ceil((end - start)/foldbucketsecs))
cprime = nfolds * cols
gtsnew = []
for i in range(0, cprime):
gtsnew.append("ts_%s,fold=%s" % (i%cols, i/cols))
idx2 = pd.DatetimeIndex(start=start, freq=('%sS' % periodicitysecs), end=start+foldbucketsecs, dtype='datetime64[s]')
meta = pd.DataFrame(index=idx2[:0], data=[], columns=gtsnew, dtype=np.float64)
folded_df = from_delayed.map_partitions(delayed(fold)(from_delayed, start, foldbucketsecs), meta=meta)
result = c.submit(reduce_folds, folded_df)
c.gather(result).visualize(filename='/usr/share/nginx/html/svg/df4.svg')
res = c.gather(result).compute()
for f in res:
f.compute()