Pyspark Средний интервал для RDD - PullRequest
0 голосов
/ 10 ноября 2019

Я пытаюсь использовать PySpark, чтобы найти среднюю разницу между смежным списком кортежей.

Например, если у меня есть RDD, например,

vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]

Я хочу найти среднееразница для каждого ключа.

Например, для значения ключа "2"

Средняя разница будет (абс (110-130) + абс (130-120)) / 2 = 15.

Это мой подход до сих пор. Я пытаюсь изменить средний код расчета, чтобы приспособиться к этому. Но, похоже, это не работает.

from pyspark import SparkContext
aTuple = (0,0)
interval = vals.aggregateByKey(aTuple, lambda a,b: (abs(a[0] - b),a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))
finalResult = interval.mapValues(lambda v: (v[0]/v[1])).collect()

Я хочу сделать это с помощью функций RDD, без Spark SQL или любых других дополнительных пакетов.

Что было бы лучшим способомЧтобы сделать это?

Пожалуйста, дайте мне знать, если у вас есть какие-либо вопросы.

Спасибо за ваше время.

1 Ответ

1 голос
/ 10 ноября 2019

Я придумал наивный подход к этому. Я не уверен, будет ли это работать во всех случаях. Это выглядит примерно так:

Давайте сначала создадим функцию для вычисления скользящего среднего. Пожалуйста, исправьте меня, если это неправильный способ вычисления скользящего среднего.

def get_abs(num_list):
    '''
    >>> get_abs([110, 130, 120])
    15.0
    '''
    acc = 0
    num_pairs = 0
    for i in range(len(num_list)-1):
        acc += abs(num_list[i]-num_list[i+1])
        num_pairs +=1
    return acc/num_pairs

Далее мы распараллеливаем список

>>> vals = [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
>>> rdd = sc.parallelize(vals)
>>> rdd.collect()
[(2, 110),
 (2, 130),
 (2, 120),
 (3, 200),
 (3, 206),
 (3, 206),
 (4, 150),
 (4, 160),
 (4, 170)]

Затем сгруппируем значения, принадлежащие одному списку.

>>> vals = rdd.groupByKey().mapValues(list)
>>> vals.collect()
[(4, [150, 160, 170]), (2, [110, 130, 120]), (3, [200, 206, 206])]

Тогда нам просто нужно вызвать нашу функцию, которую мы определили выше, чтобы вычислить скользящее среднее по сгруппированным значениям.

>>> vals.mapValues(get_abs).collect()
[(4, 10.0), (2, 15.0), (3, 3.0)]
...