Вы можете использовать reduceByKey()
, чтобы найти строку, соответствующую ключу max, а затем использовать sortByKey()
, чтобы получить окончательно отсортированный СДР. Здесь пошагово показываются промежуточные результаты:
>>> Data = sc.parallelize([((12, u'IL'), -1.4944293272864724),
... ((10, u'NM'), 14.230100203137535),
... ((12, u'ND'), -9.687170853837522),
... ((5, u'MO'), 18.73167803079034),
... ((12, u'NH'), -3.329505034062821)])
Сначала преобразуйте СДР, чтобы первое значение было ключом, а остальные - значением:
>>> rdd1 = Data.map(lambda x: (x[0][0], (x[0][1], x[1])))
>>> pprint(rdd1.collect())
[(12, (u'IL', -1.4944293272864724)),
(10, (u'NM', 14.230100203137535)),
(12, (u'ND', -9.687170853837522)),
(5, (u'MO', 18.73167803079034)),
(12, (u'NH', -3.329505034062821))]
Используйте reduceByKey()
, чтобы получить пару с наибольшим значением для данного ключа:
>>> rdd2 = rdd1.reduceByKey(lambda x, y: x if x[1] > y[1] else y)
>>> pprint(rdd2.collect())
[(5, (u'MO', 18.73167803079034)),
(10, (u'NM', 14.230100203137535)),
(12, (u'IL', -1.4944293272864724))]
По совпадению результат уже отсортирован, но не полагайтесь на это:
>>> rdd3 = rdd2.sortByKey()
Сопоставьте с желаемым выходным форматом и соберите:
>>> rdd3.map(lambda x: list((x[0],) + x[1])).collect()
[[5, u'MO', 18.73167803079034], [10, u'NM', 14.230100203137535], [12, u'IL', -1.4944293272864724]]
В одном выражении:
>>> Data.map(lambda x: (x[0][0], (x[0][1], x[1]))) \
... .reduceByKey(lambda x, y: x if x[1] > y[1] else y) \
... .sortByKey() \
... .map(lambda x: list((x[0],) + x[1])) \
... .collect()
[[5, u'MO', 18.73167803079034], [10, u'NM', 14.230100203137535], [12, u'IL', -1.4944293272864724]]