Примените к сгруппированному кадру данных dask для выполнения операции, подобной преобразованию - PullRequest
0 голосов
/ 10 апреля 2020

У меня есть итеративная функция, которая может быть применена к каждому кадру данных в объекте groupby, возвращая массив той же длины, что и этот кадр данных, для выполнения операции, подобной преобразованию.

Я хотел бы использовать dask распараллелить этот код, но не удалось.

В двух словах вопрос заключается в следующем:

Как использовать dask для распараллеливания следующего кода?

results = []
for ind, group in df.groupby('user')[['location', 'time']]:
    results.extend(my_function(group.values))

df['results'] = results

Минимальный воспроизводимый пример

Импорт

import numpy as np
import pandas as pd

Пример данных

df = pd.DataFrame([
     (0, 1, 1),
     (0, 1, 2),
     (0, 1.1, 3),
     (0, 2, 4),
     (1, 3, 1),
     (1, 2, 2),
     (1, 2.1, 3),
     (1, 1, 4)],
    columns=['user', 'location', 'time'])

Итерационная функция

Описание : эта функция проходит местоположения и время пользователя и находит группы точек, где пользователь был неподвижен, то есть не двигался много (мы проверяем, когда пользователь покинул местоположение на np.abs(p_i - p_j) > 0.2) в течение достаточно длительного времени (np.abs(t_i - t_j) > 1.5) .

def detect_staypoints(user):
    N = len(user)
    staypoint = np.zeros(N)

    i=0
    while i < N - 1:
        j = i+1
        p_i = user[i][1]
        while j < N:
            p_j = user[j][1]
            # Left the location?
            if (np.abs(p_i - p_j) > 0.2) or (j == N-1):
                # Been there long enough?
                t_i = user[i][2]
                t_j = user[j][2]
                if np.abs(t_i - t_j) > 1.5:
                    # Then it is a staypoint
                    staypoint[i:j] = 1
                i = j
                break

            j = j+1

    return staypoint

и код, опять же

staypoint = []
for ind, group in df.groupby('user')[['location', 'time']]:
    staypoint.extend(detect_staypoints(group.values))

df['staypoint'] = staypoint
...