Возврат строки с наибольшим значением на ключ без потери всей строки в СДР - PullRequest
0 голосов
/ 14 апреля 2020

Я начал играть с pyspark RDD и DF. Со знанием SQL мне было удобно с DF и его SQL модулем. Однако я изо всех сил пытаюсь отфильтровать строки в обычном СДР, не преобразовывая его в DF. В приведенном ниже примере я хочу найти самый высокий третий столбец для первого столбца и вернуть всю строку или только вторую строку и отсортировать ее по первому столбцу. В DF я использовал бы управление окнами по первому столбцу и ранжировал каждую строку, а затем фильтровал строки на основе ранга.

Data = sc.parallelize([((12, u'IL'), -1.4944293272864724),
                       ((10, u'NM'), 14.230100203137535),
                       ((12, u'ND'), -9.687170853837522),
                       ((5, u'MO'), 18.73167803079034),
                       ((12, u'NH'), -3.329505034062821)])

Желаемый результат

Data.collect()
[[5, u'MO', 18.73167803079034], [10, u'NM', 14.230100203137535], [12, u'IL', -1.4944293272864724]]

В качестве альтернативы

Data.collect()
[u'MO', u'NM', u'IL']

Ответы [ 3 ]

0 голосов
/ 14 апреля 2020
Можно использовать метод

sortBy.

Data.sortBy(lambda x: x[1],ascending=False).collect()

Чтобы получить только необходимый столбец, передайте результат sortBy в метод map, чтобы получить только необходимые столбцы.

Data.sortBy(lambda x: x[1],ascending=False).map(lambda x: x[0][1]).collect()

0 голосов
/ 14 апреля 2020

Вы можете использовать reduceByKey(), чтобы найти строку, соответствующую ключу max, а затем использовать sortByKey(), чтобы получить окончательно отсортированный СДР. Здесь пошагово показываются промежуточные результаты:

>>> Data = sc.parallelize([((12, u'IL'), -1.4944293272864724),
...                        ((10, u'NM'), 14.230100203137535),
...                        ((12, u'ND'), -9.687170853837522),
...                        ((5, u'MO'), 18.73167803079034),
...                        ((12, u'NH'), -3.329505034062821)])

Сначала преобразуйте СДР, чтобы первое значение было ключом, а остальные - значением:

>>> rdd1 = Data.map(lambda x: (x[0][0], (x[0][1], x[1])))
>>> pprint(rdd1.collect())
[(12, (u'IL', -1.4944293272864724)),
 (10, (u'NM', 14.230100203137535)),
 (12, (u'ND', -9.687170853837522)),
 (5, (u'MO', 18.73167803079034)),
 (12, (u'NH', -3.329505034062821))]

Используйте reduceByKey(), чтобы получить пару с наибольшим значением для данного ключа:

>>> rdd2 = rdd1.reduceByKey(lambda x, y: x if x[1] > y[1] else y)
>>> pprint(rdd2.collect())
[(5, (u'MO', 18.73167803079034)),
 (10, (u'NM', 14.230100203137535)),
 (12, (u'IL', -1.4944293272864724))]

По совпадению результат уже отсортирован, но не полагайтесь на это:

>>> rdd3 = rdd2.sortByKey()

Сопоставьте с желаемым выходным форматом и соберите:

>>> rdd3.map(lambda x: list((x[0],) + x[1])).collect()
[[5, u'MO', 18.73167803079034], [10, u'NM', 14.230100203137535], [12, u'IL', -1.4944293272864724]]

В одном выражении:

>>> Data.map(lambda x: (x[0][0], (x[0][1], x[1]))) \
...     .reduceByKey(lambda x, y: x if x[1] > y[1] else y) \
...     .sortByKey() \
...     .map(lambda x: list((x[0],) + x[1])) \
...     .collect()
[[5, u'MO', 18.73167803079034], [10, u'NM', 14.230100203137535], [12, u'IL', -1.4944293272864724]]
0 голосов
/ 14 апреля 2020

Для СДР у вас есть специальные операторы, которые могут сделать это, чтобы достичь желаемого без потери содержимого вашего СДР, вы можете действовать следующим образом:

Sorted = Data.sortBy(lambda x: x[1],ascending= False)
Mapped = Sorted.map(lambda x : x[0][1])
Mapped.collect()

Вывод приведенной выше последовательности инструкций будет:

['MO', 'NM', 'IL', 'NH', 'ND']

Вы можете играть со второй инструкцией (оператор карты), однако вы хотите получить любой элемент, а не только те метки, которые вы упомянули.

Если вам нужны только первые три элемента, вместо последней инструкции вы можете использовать:

Mapped.take(3)

В таком случае вы получите:

['MO', 'NM', 'IL']
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...