СДР с (ключ, (ключ2, значение)) - PullRequest
0 голосов
/ 01 января 2019

У меня есть RDD в pyspark вида (ключ, другие вещи), где "другие вещи" - это список полей.Я хотел бы получить другую СДР, которая использует второй ключ из списка полей.Например, если моя первоначальная СДР:

(Пользователь1, 1990 4 2 зеленый ...)
(Пользователь1, 1990 2 2 зеленый ...)
(Пользователь2, 1994 3 8 синий...)
(User1, 1987 3 4 blue ...)

Я хотел бы получить (User1, [(1990, x), (1987, y)]), (User2,(1994 z))

где x, y, z будет агрегацией по другим полям, например, x - это количество, которое может иметь строки, которые у меня есть с User1 и 1990 (два в данном случае), и Iполучить список с одним кортежем в год.

Я смотрю на ключевые значения функций из: https://www.oreilly.com/library/view/learning-spark/9781449359034/ch04.html

Но, похоже, не найти ничего, что даст и агрегацию дважды: один раздля пользователя и один на год.Моя первоначальная попытка была с добавлением комбинироватьByKey (), но я застрял в получении списка значений.

Буду признателен за любую помощь!

1 Ответ

0 голосов
/ 01 января 2019

Вы можете сделать следующее, используя groupby:

# sample rdd
l = [("User1", "1990"), 
     ("User1", "1990"),
     ("User2", "1994"),
     ("User1", "1987") ]

rd = sc.parallelize(l)

# returns a tuples of count of year
def f(l):
    dd = {}
    for i in l:
        if i not in dd:
            dd[i] =1
        else:
            dd[i]+=1
    return list(dd.items())

# using groupby and applying the function on x[1] (which is a list)
rd1 = rd.groupByKey().map(lambda x : (x[0], f(x[1]))).collect()

[('User1', [('1990', 2), ('1987', 1)]), ('User2', [('1994', 1)])]
...