Spark reduByKey () для возврата составного значения - PullRequest
0 голосов
/ 11 октября 2019

Я новичок в Spark и наткнулся на следующую (возможно, простую) проблему.

У меня есть СДР с элементами ключ-значение, каждое значение является парой (строка, число). Например, пара ключ-значение ('A', ('02', 43)).

Я хочу уменьшить этот СДР, сохранив элементы (ключ и значение целом )с максимальными числами, когда они используют один и тот же ключ.

ReduceByKey () кажется уместным, и я пошел с этим MWE.

sc= spark.sparkContext
rdd = sc.parallelize([
 ('A', ('02', 43)),
 ('A', ('02', 36)),
 ('B', ('02', 306)),
 ('C', ('10', 185))])
rdd.reduceByKey(lambda a,b : max(a[1],b[1])).collect()

, который производит

[('C', ('10', 185)), ('A', 43), ('B', ('02', 306))]

Моя проблемавот что я хотел бы получить:

[('C', ('10', 185)), ('A', ('02', 43)), ('B', ('02', 306))]

то есть я не вижу, как вернуть ('A', ('02', 43)), а не просто ('A', 43).

Ответы [ 2 ]

0 голосов
/ 11 октября 2019

Следующий код находится в Scala, надеюсь, вы можете преобразовать ту же логику в pyspark

val rdd = sparkSession.sparkContext.parallelize(Array(('A', (2, 43)), ('A', (2, 36)), ('B', (2, 306)), ('C', (10, 185))))

val rdd2 = rdd.reduceByKey((a, b) => (Math.max(a._1, b._1), Math.max(a._2, b._2)))

rdd2.collect().foreach(println)

output:

(B,(2,306))
(A,(2,43))
(C,(10,185))
0 голосов
/ 11 октября 2019

Я нашел решение этой простой проблемы. Определите функцию вместо использования встроенной функции для reduByKey (). Это:

def max_compound(a,b):
 if (max(a[1],b[1])==a[1]):
   return a
 else: 
   return b

и вызов:

rdd.reduceByKey(max_compound).collect()
...