Агрегация PySpark и сложная схема - PullRequest
0 голосов
/ 16 мая 2018

У меня есть фрейм данных Spark (df1), подобный этому:

deviceid   host      count 
a.b.c.d   0.0.0.0     1
a.b.c.d   1.1.1.1     3
x.y.z     0.0.0.0     2

Я хочу преобразовать его в новый фрейм данных, подобный этому

deviceid   hosts_counts   
a.b.c.d    [(0.0.0.0,1),(1.1.1.1,3)]
x.y.z      [(0.0.0.0,2)]

Я попробовал вот что:

def convertTuple(*data): 
    for k,v in data: 
        return k[0], (k[1],v)  

df2 = df1.map(convertTuple) # zip host and count 

Тогда:

function countReducer(a,b): 
    return a + b
df3 = df2.reduceByKey(countReducer)

Однако, это дает мне такой фрейм данных, и я не знаю, как идти дальше для достижения моей конечной цели:

df3 Screenshot

Редактировать

Мне удалось использовать groupby и collect_list для решения этой проблемы.Сложная часть для агрегирования в кортеже (host,count), вам нужно создать strcut.Вот код:

df = df1.groupby("deviceid").agg(collect_list(struct("domain","count")).alias("domain_count"))

1 Ответ

0 голосов
/ 16 мая 2018

Проблема в том, что вы объединяете кортежи вместе, countReducer не выдаст вам список кортежей.В Python:

(1,2) + (3,4) = (1,2,3,4)

Что вы можете сделать, это преобразовать кортеж в список кортежей (с одним элементом).Это можно сделать с помощью map:

.map(lambda x: (x[0], [ x[1] ]))

Но в этом случае было бы лучше изменить функцию convertTuple, чтобы она возвращала то, что вы хотите:

def convertTuple(*data): 
    for k,v in data: 
        return k[0], [(k[1],v)]

Как примечание, похоже, что вы используете RDD, а не кадры данных.Если вы не используете старую версию Spark, я бы порекомендовал вместо этого перейти на фреймы данных, поскольку с ними немного проще работать.

...