Добавление столбца в dask dataframe, вычисление его через скользящее окно - PullRequest
1 голос
/ 02 апреля 2019

Предположим, у меня есть следующий код для генерации фиктивного фрейма данных dask:

import pandas as pd
import dask.dataframe as dd
pandas_dataframe = pd.DataFrame({'A' : [0,500,1000], 'B': [-100, 200, 300]  , 'C' : [0,0,1.0] } )    
test_data_frame = dd.from_pandas( pandas_dataframe, npartitions= 1  )

В идеале я хотел бы знать, каков рекомендуемый способ добавления другого столбца во фрейм данных, вычисляя содержимое столбцачерез скользящее окно ленивым образом.

Я предложил следующий подход:

import numpy as np
import dask.delayed as delay

@delay
def coupled_operation_example(dask_dataframe, 
                              list_of_input_lbls, 
                              fcn, 
                              window_size, 
                              init_value, 
                              output_lbl):

    def preallocate_channel_data(vector_length, first_components):
        vector_out = np.zeros(len(dask_dataframe))
        vector_out[0:len(first_components)] = first_components
        return vector_out

    def create_output_signal(relevant_data, fcn, window_size , initiated_vec):

       ## to be written; fcn would be  a fcn accepting the sliding window


    initiatied_vec = preallocate_channel_data(len(dask_dataframe, init_value))
    relevant_data = dask_dataframe[list_of_input_lbls]
    my_output_signal = create_output_signal(relevant_data, fcn, window_size, initiated_vec)

Я писал это, убежденный, что dask dataframe позволит мне кое-что нарезать: они делаютне.Итак, мой первый вариант - извлечь столбцы, участвующие в вычислениях, в виде пустых массивов, но они будут с нетерпением оцениваться.Я думаю, что штраф в производительности будет значительным.На данный момент я создаю dask-фреймы данных из данных h5, используя h5py: так что все лениво, пока я не напишу выходные файлы.

До сих пор я обрабатывал только данные в определенной строке;поэтому я использовал:

 test_data_frame .apply(fcn, axis =1, meta = float)

Я не думаю, что существует эквивалентный функциональный подход для скользящих окон;я прав?Я хотел бы что-то вроде Seq.windowed в F # или Haskell.Любое предложение высоко ценится.

1 Ответ

0 голосов
/ 04 апреля 2019

Я пытался решить это через закрытие. Я опубликую эталонные тесты на некоторых данных, как только я завершу разработку кода. На данный момент у меня есть следующий игрушечный пример, который, кажется, работает: поскольку методы применения dask dataframe, похоже, сохраняют порядок строк.

import numpy as np
import pandas as pd
import dask.dataframe as dd
number_of_components = 30


df = pd.DataFrame(np.random.randint(0,number_of_components,size=(number_of_components, 2)), columns=list('AB'))
my_data_frame = dd.from_pandas(df, npartitions = 1 )


def sumPrevious( previousState ) :

     def getValue(row):
        nonlocal previousState 
        something = row['A'] - previousState 
        previousState = row['A']
        return something

     return getValue


given_func = sumPrevious(1 )
out = my_data_frame.apply(given_func, axis = 1 , meta = float)
df['computed'] = out.compute()

Теперь плохие новости, я попытался абстрагировать их, передавая состояние вокруг и используя скользящее окно любой ширины, через эту новую функцию:

def generalised_coupled_computation(previous_state , coupled_computation, previous_state_update) :

    def inner_function(actual_state):
        nonlocal previous_state
        actual_value = coupled_computation(actual_state , previous_state  )
        previous_state = previous_state_update(actual_state, previous_state)
        return actual_value

    return inner_function

Предположим, мы инициализируем функцию с помощью:

init_state = df.loc[0] 
coupled_computation  = lambda act,prev : act['A'] - prev['A']
new_update = lambda act, prev : act
given_func3 = generalised_coupled_computation(init_state , coupled_computation, new_update )
out3 = my_data_frame.apply(given_func3, axis = 1 , meta = float)

Попробуйте запустить его и будьте готовы к неожиданностям: первый элемент неправильный, возможно, некоторые проблемы с указателем, учитывая странный результат. Любое понимание?

Так или иначе, если кто-то передает примитивные типы, он, кажется, функционирует.


Обновление:

решение в использовании копии:

import copy as copy

def new_update(act, previous):
    return copy.copy(act)

Теперь функции ведут себя как положено; конечно, необходимо адаптировать обновления функций и функцию связанных вычислений, если требуется более связанная логика

...