У меня есть решение, но требуется время от времени переключаться с rdd
на использование DataFrame
. Самая прямая реализация будет заключаться в прямом использовании DataFrame
data = sc.parallelize([('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))])
Преобразование вашего rdd в (key1_key2, value) формат:
data = data.map(lambda l: (l[0] + "_" + l[1][0], l[1][1]))
data.take(2)
# [('Manhattan_East Village', 2), ('Manhattan_Theater District', 2)]
Затем агрегирование:
data = data.reduceByKey(lambda x,y:x+y)
data.take(2)
# [('Manhattan_Theater District', 2), ('Queens_John F. Kennedy International Airport', 2)]
Разделить, чтобы получить (key1, key2, value) формат:
data2 = data.map(lambda l: (l[0].split("_"), l[1]))
data2 = data2.map(lambda l: (l[0][0], l[0][1], l[1]))
data2.take(2)
# [('Manhattan', 'Theater District', 2), ('Queens', 'John F. Kennedy International Airport', 2)]
Выбор лучших n функций будет проще с DataFrame
API (фактически первая часть была бы проще). Я использую функцию window
:
df = data2.toDF(['district','neighbor','count'])
import pyspark.sql.functions as psf
import pyspark.sql.window as psw
w = psw.Window.partitionBy('district').orderBy(psf.desc('count'))
df = (df.select(psf.col('*'), psf.row_number().over(w).alias('row_number'))
.where(psf.col('row_number') <= 3)
)
df.show(10)
+---------+--------------------+-----+----------+
| district| neighbor|count|row_number|
+---------+--------------------+-----+----------+
| Queens|John F. Kennedy I...| 2| 1|
| Queens| LaGuardia Airport| 2| 2|
| Queens| Sunnyside| 2| 3|
| Brooklyn| Brooklyn Heights| 2| 1|
|Manhattan| Theater District| 2| 1|
|Manhattan| Chinatown| 2| 2|
|Manhattan| Murray Hill| 2| 3|
+---------+--------------------+-----+----------+
Чтобы получить желаемый результат, один из способов сделать это - вернуться к rdd
:
df.rdd.map(lambda l: (l[0], (l[1], l[2]))).reduceByKey(lambda x,y: x + y).take(2)
# [('Manhattan', ('Theater District', 2, 'Chinatown', 2, 'Murray Hill', 2)),
('Brooklyn', ('Brooklyn Heights', 2))]