У меня есть итеративная функция, которая может быть применена к каждому кадру данных в объекте groupby, возвращая массив той же длины, что и этот кадр данных, для выполнения операции, подобной преобразованию.
Я хотел бы использовать dask распараллелить этот код, но не удалось.
В двух словах вопрос заключается в следующем:
Как использовать dask для распараллеливания следующего кода?
results = []
for ind, group in df.groupby('user')[['location', 'time']]:
results.extend(my_function(group.values))
df['results'] = results
Минимальный воспроизводимый пример
Импорт
import numpy as np
import pandas as pd
Пример данных
df = pd.DataFrame([
(0, 1, 1),
(0, 1, 2),
(0, 1.1, 3),
(0, 2, 4),
(1, 3, 1),
(1, 2, 2),
(1, 2.1, 3),
(1, 1, 4)],
columns=['user', 'location', 'time'])
Итерационная функция
Описание : эта функция проходит местоположения и время пользователя и находит группы точек, где пользователь был неподвижен, то есть не двигался много (мы проверяем, когда пользователь покинул местоположение на np.abs(p_i - p_j) > 0.2
) в течение достаточно длительного времени (np.abs(t_i - t_j) > 1.5
) .
def detect_staypoints(user):
N = len(user)
staypoint = np.zeros(N)
i=0
while i < N - 1:
j = i+1
p_i = user[i][1]
while j < N:
p_j = user[j][1]
# Left the location?
if (np.abs(p_i - p_j) > 0.2) or (j == N-1):
# Been there long enough?
t_i = user[i][2]
t_j = user[j][2]
if np.abs(t_i - t_j) > 1.5:
# Then it is a staypoint
staypoint[i:j] = 1
i = j
break
j = j+1
return staypoint
и код, опять же
staypoint = []
for ind, group in df.groupby('user')[['location', 'time']]:
staypoint.extend(detect_staypoints(group.values))
df['staypoint'] = staypoint