несоответствие типов в scala при использовании reduByKey - PullRequest
0 голосов
/ 03 мая 2018

Я отдельно проверил свой код ошибки в оболочке scala

scala> val p6 = sc.parallelize(List( ("a","b"),("b","c")))
p6: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val p7 = p6.map(a => ((a._1+a._2), (a._1, a._2, 1)))
p7: org.apache.spark.rdd.RDD[(String, (String, String, Int))] = MapPartitionsRDD[11] at map at <console>:26

scala> val p8 = p7.reduceByKey( (a,b) => (a._1,(a._2, a._3+b._3)))
<console>:28: error: type mismatch;
 found   : (String, (String, Int))
 required: (String, String, Int)
       val p8 = p7.reduceByKey( (a,b) => (a._1,(a._2, a._3+b._3)))

Я хочу использовать a._1 в качестве ключа, чтобы я мог в дальнейшем использовать оператор join, и это должны быть пары (ключ, значение). Но мой вопрос: почему при использовании функции сокращения используется тип required? Я думаю, что формат устанавливается нами, а не регулируется. Я не прав?

Кроме того, если я ошибаюсь, то почему это (String, String, Int) требуется? Почему это не что-то еще?

ps: я знаю, (String, String, Int) - это тип значения в (a._1+a._2), (a._1, a._2, 1)), который является функцией карты, но официальный пример показывает, что функция сокращения (a, b) => (a._1 + b._1, a._2 + b._2) действительна. И я думаю, что все это, включая мой код выше, должно быть действительным

Ответы [ 2 ]

0 голосов
/ 03 мая 2018

ваш p7 имеет p7: org.apache.spark.rdd.RDD[(String, (String, String, Int))], но в вашем reduceByKey вы использовали (a._1,(a._2, a._3+b._3)) типа (String, (String, Int))

Тип вывода p8 также должен быть p8: org.apache.spark.rdd.RDD[(String, (String, String, Int))]

поэтому определение, подобное следующему, должно работать для вас

val p8 = p7.reduceByKey( (a,b) => (a._1, a._2, a._3+b._3))

Вы можете прочитать мой ответ в pyspark для получения более подробной информации о том, какубыслите, как работает lessByKey

и этот тоже должен помочь

0 голосов
/ 03 мая 2018

Посмотрите на типы. Метод сокращения по ключу на RDD[(K, V)] с подписью:

def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

Другими словами, оба входных аргумента и возвращаемый аргумент должны быть одного типа.

В вашем случае p7 - это

RDD[(String, (String, String, Int))]

, где K равно String, а V равно (String, String, Int), поэтому функция, используемая с reduceByKey, должна быть

((String, String, Int), (String, String, Int)) => (String, String, Int)

Допустимая функция:

p7.reduceByKey( (a,b) => (a._1, a._2, a._3 + b._3))

что даст вам

(bc,(b,c,1))
(ab,(a,b,1))

в результате.

Если вы хотите изменить тип в методе byKey, вы должны использовать aggregateByKey или combineByKey.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...