PySpark: GroupByKey и получение суммы кортежа кортежа - PullRequest
0 голосов
/ 26 апреля 2020

У меня есть этот набор данных:

[('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))]

Что я хочу сделать, так это то, что для каждого района я хочу получить три верхние окрестности по сумме.

Формат этих данных:

X = (Borough, (Neighborhood, total))

Мой мыслительный процесс заключается в следующем:

Я хочу сделать групповой ключ для этих данных, где я сначала получу все три района, а затем три самые высокие окрестности, поэтому код:

X.groupByKey().mapValues(sum).collect()

Однако, насколько я понимаю, это даст ошибку, потому что второй элемент снова является кортежем, и я хочу получить доступ ко второму элементу этого второго кортежа, который Я не уверен, как это сделать.

Кроме того, таким образом, я бы просто агрегировал данные, поэтому я написал этот фрагмент кода, который даст мне три наибольших окрестности:

def findingLargest(item):
    from heapq import nlargest
    i, j = item
    tops = nlargest(3, j,key=lambda x: x[1])
    return (i, tops)

Итак, последний код, который я мог бы придумать:

X.groupByKey()\
 .map(findingLargest)

Ожидаемый результат:

Borough, Top_1 Neighborhood, Top_1_count, Top_2 Neighborhood, Top_2_count

Любые предложения относительно того, как мне поступить с этим?

1 Ответ

0 голосов
/ 26 апреля 2020

У меня есть решение, но требуется время от времени переключаться с rdd на использование DataFrame. Самая прямая реализация будет заключаться в прямом использовании DataFrame

data = sc.parallelize([('Manhattan', ('East Village', 2)),
('Manhattan', ('Theater District', 2)),
('Queens', ('Sunnyside', 2)),
('Manhattan', ('Murray Hill', 2)),
('Manhattan', ('Battery Park City', 2)),
('Queens', ('John F. Kennedy International Airport', 2)),
('Queens', ('LaGuardia Airport', 2)),
('Manhattan', ('NoHo', 2)),
('Manhattan', ('Chinatown', 2)),
('Brooklyn', ('Brooklyn Heights', 2))])

Преобразование вашего rdd в (key1_key2, value) формат:

data = data.map(lambda l: (l[0] + "_" + l[1][0], l[1][1]))
data.take(2)
# [('Manhattan_East Village', 2), ('Manhattan_Theater District', 2)]

Затем агрегирование:

data = data.reduceByKey(lambda x,y:x+y)
data.take(2)
# [('Manhattan_Theater District', 2), ('Queens_John F. Kennedy International Airport', 2)]

Разделить, чтобы получить (key1, key2, value) формат:

data2 = data.map(lambda l: (l[0].split("_"), l[1]))
data2 = data2.map(lambda l: (l[0][0], l[0][1], l[1]))
data2.take(2)
# [('Manhattan', 'Theater District', 2), ('Queens', 'John F. Kennedy International Airport', 2)]

Выбор лучших n функций будет проще с DataFrame API (фактически первая часть была бы проще). Я использую функцию window:

df = data2.toDF(['district','neighbor','count'])
import pyspark.sql.functions as psf
import pyspark.sql.window as psw

w = psw.Window.partitionBy('district').orderBy(psf.desc('count'))
df = (df.select(psf.col('*'), psf.row_number().over(w).alias('row_number'))
      .where(psf.col('row_number') <= 3)
     )
df.show(10)
+---------+--------------------+-----+----------+
| district|            neighbor|count|row_number|
+---------+--------------------+-----+----------+
|   Queens|John F. Kennedy I...|    2|         1|
|   Queens|   LaGuardia Airport|    2|         2|
|   Queens|           Sunnyside|    2|         3|
| Brooklyn|    Brooklyn Heights|    2|         1|
|Manhattan|    Theater District|    2|         1|
|Manhattan|           Chinatown|    2|         2|
|Manhattan|         Murray Hill|    2|         3|
+---------+--------------------+-----+----------+

Чтобы получить желаемый результат, один из способов сделать это - вернуться к rdd:

df.rdd.map(lambda l: (l[0], (l[1], l[2]))).reduceByKey(lambda x,y: x + y).take(2)
# [('Manhattan', ('Theater District', 2, 'Chinatown', 2, 'Murray Hill', 2)),
 ('Brooklyn', ('Brooklyn Heights', 2))]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...