Spark scala: изменение числовых значений в паре ключ-значение на long / integer для объединения двух карт - PullRequest
0 голосов
/ 23 октября 2018

Привет, я новичок в использовании scala.У меня есть два разных файла, которые я уже создал две карты, как мне нужно, следующим образом:

data 1
1 : 2
2 : 1,3,4
3 : 2,4
4 : 2, 3

map1 будет вычислять каждое ключевое вхождение после ":" Выходные данные для map1:

(1, 1)
(2, 3)
(3, 2)
(4, 2)

data 2:

apple
banana
kiwi
orange
strawberry

map2 сообщит положение элемента, и его выходные данные будут:

(1, apple)
(2, banana)
(3, kiwi)
(4, orange)
(5, strawberry)

мне нужно соединить две карты с выводом следующим образом:

(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)

Я могу использовать только org.apache.spark.SparkConf и org.apache.spark.SparkCotext.Вот код, который я использую до сих пор:

    val sc = new SparkContext (conf)
    val data1 = sc.textFile("input.txt")
    val map1 = data1.map(x => x.split(":")(0), x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((y,_)}
.filter(_._2.nonEmpty).sortByKey().countByKey()
    val data2 = sc.textFile("input2.txt")
    val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
.map(pair => pair.swap)

    val merge_map = map2.join(map1)

Я хочу объединить две карты, которые я сделал, но это выдает ошибку вроде этого:

type mismatch; 
found: scala.collection.Map[String, Long] 
required: org.apache.spark.rdd.RDD[(Long,?)]

Я думал, может бытьМне нужно изменить тип значений в map1 / map2.Есть идеи как это сделать?Спасибо!

СЛЕДУЙТЕ ЗА ВОПРОСАМИ :

Теперь мне нужно создать map3 с теми же данными, которые вычисляли бы вхождение каждого значения в правую часть после ":».И снова присоединитесь к карте 2. Вот выходные данные map3 и результат соединения, который мне нужен для map3 и map2.

output map3:

(1,1)
(2,3)
(3,2)
(4,2)

join map2 & map3:

(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)

Вот код, который я использую:

val map3 = data1.map(x => x.split(":")(0).toLong, x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((_,1)}.reduceByKey(_+_)

val merge_map23 = map2.leftOuterJoin(map3)

Я получил ошибку:

type mismatch; 
    found: org.apache.spark.rdd.RDD[String, Long] 
    required: org.apache.spark.rdd.RDD[(Long,?)]

Я уже исправил предыдущий код с ответами ниже, но теперь яполучил эту ошибку.Спасибо

1 Ответ

0 голосов
/ 23 октября 2018

Не используйте countByKey.Используйте reduceByKey:

val map1 = data1.map(x => x.split(":")(0), x.split(":")(1)))
  .flatMap{case (y,z) => z.split("\\s+").map((y,_)}    
  .filter(_._2.nonEmpty).mapValues(_ => 1).reduceByKey(_ + _)

Тогда не используйте collectAsMap:

val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
  .map(pair => pair.swap)

Наконец присоединитесь

map1.join(map2)
...