Я придумал наивный подход к этому. Я не уверен, будет ли это работать во всех случаях. Это выглядит примерно так:
Давайте сначала создадим функцию для вычисления скользящего среднего. Пожалуйста, исправьте меня, если это неправильный способ вычисления скользящего среднего.
def get_abs(num_list):
'''
>>> get_abs([110, 130, 120])
15.0
'''
acc = 0
num_pairs = 0
for i in range(len(num_list)-1):
acc += abs(num_list[i]-num_list[i+1])
num_pairs +=1
return acc/num_pairs
Далее мы распараллеливаем список
>>> vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
>>> rdd = sc.parallelize(vals)
>>> rdd.collect()
[(2, 110),
(2, 130),
(2, 120),
(3, 200),
(3, 206),
(3, 206),
(4, 150),
(4, 160),
(4, 170)]
Затем сгруппируем значения, принадлежащие одному списку.
>>> vals = rdd.groupByKey().mapValues(list)
>>> vals.collect()
[(4, [150, 160, 170]), (2, [110, 130, 120]), (3, [200, 206, 206])]
Тогда нам просто нужно вызвать нашу функцию, которую мы определили выше, чтобы вычислить скользящее среднее по сгруппированным значениям.
>>> vals.mapValues(get_abs).collect()
[(4, 10.0), (2, 15.0), (3, 3.0)]