Проработав эту проблему почти неделю, я думаю, что у меня есть решение.Это не так лаконично, как хотелось бы, но оно не позволяет загружать слишком много памяти одновременно.Я не уверен на 100%, масштабировал ли я это только на своем ноутбуке, будет ли он распределять задачи по другим рабочим узлам или нет.
В итоге я переместил свои данные из пера в файлы bcolz ctables.Это позволило мне переделывать датафреймы / таблицы без хлопот, которые внес Dask.И я почти уверен, что мне не нужно беспокоиться о том, что на моем компьютере не хватает памяти.
import bcolz
import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd
from copy import copy
def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence, lst=True):
'''
iterative smoothed z-score algorithm
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
'''
import numpy as np
labels = np.zeros(len(x))
filtered_y = np.array(x)
avg_filter = np.zeros(len(x))
std_filter = np.zeros(len(x))
var_filter = np.zeros(len(x))
avg_filter[lag - 1] = np.mean(x[0:lag])
std_filter[lag - 1] = np.std(x[0:lag])
var_filter[lag - 1] = np.var(x[0:lag])
for i in range(lag, len(x)):
if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
if x[i] > avg_filter[i - 1]:
labels[i] = 1
else:
labels[i] = -1
filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
else:
labels[i] = 0
filtered_y[i] = x[i]
# update avg, var, std
avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
std_filter[i] = np.sqrt(var_filter[i])
return [labels, avg_filter, std_filter]
def make_example_data():
# Make example data
y = np.array(
[1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
# simulate data stored in individual files
df = pd.DataFrame(
{
"Time": np.arange(len(y)),
"y1": y,
"y2": y * 2,
"y3": y ** 2,
"yn": y ** (y)
}
)
bigdf = pd.DataFrame()
for i in range(10):
_df = df
# create my partitioning column
_df["session"] = "S0" + str(i)
bigdf = pd.concat([bigdf, _df], axis=0)
# return a normal dataframe that looks similar to a dask dataframe
return bigdf
def ctable_append(cts):
"""
A function to append multiple ctables and clean up the disk entries along the 0 axis
similar to pd.concat([df1, df2], axis=0)
:param cts: a string containing the root directory path or a list of ctables
:return: ctable
"""
import shutil
ctables = []
first = True
# check if we are getting a list or a root dir
if type(cts) == str:
cts = bcolz.walk(cts)
for ct in cts:
if first is True:
ct1 = ct
else:
ct1.append(ct)
shutil.rmtree(ct.rootdir)
first = False
return ct1
# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0
bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
res1 = pd.concat([pd.DataFrame(a).T for a in res1])
res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
results_df = pd.concat([results_df, res1], axis=1)
pd_results = pd.concat([bigdf, results_df], axis=1)
bigdf = make_example_data()
sessions = list(set(bigdf['session']))
root_dir = os.path.join(os.getcwd(), 'example_data')
# breaking this example dataset out into something a little more like my real dataset
for session in sessions:
sdf = bigdf[bigdf['session'] == session]
sess_dir = os.path.join(root_dir, session)
bcolz.ctable.fromdataframe(sdf, rootdir=sess_dir)
dnapply_cols = [
'session',
'Time'
] # columns that are not signals to find peaks in
lazy_apply = []
# apply my function to all the data.. making the extra columns
# don't think that Dask is really needed here as I'm not sure if it actually distributes the tasks
# when I ran this on a lot more data I only had one maybe two cores doing anything.
# this could have been because of the cost of memory but my ram didn't really go beyond being
# half used.
for ct in bcolz.walk(root_dir):
for column in ct.cols.names:
if column not in dnapply_cols:
# signal, mean_filter, std_filter = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
res = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
lazy_apply.append(delayed(ct.addcol)(res[0], name=column + "_Signal"))
lazy_apply.append(delayed(ct.addcol)(res[1], name=column + "_meanFilter"))
lazy_apply.append(delayed(ct.addcol)(res[2], name=column + "_stdFilter"))
dask.compute(*lazy_apply)
# combine all ctables into a single ctable
ct1 = ctable_append(root_dir)
dd_results = dd.from_bcolz(ct1, chunksize=74) # chose a chunk size of 74 cause thats about how long each session df was
print(dd_results.head(), dd_results.compute().shape, pd_results.shape)
print("Are my Dask results the same as my Pandas results?", dd_results.compute().shape == pd_results.shape)