Привет, я новичок в использовании scala.У меня есть два разных файла, которые я уже создал две карты, как мне нужно, следующим образом:
data 1
1 : 2
2 : 1,3,4
3 : 2,4
4 : 2, 3
map1 будет вычислять каждое ключевое вхождение после ":" Выходные данные для map1:
(1, 1)
(2, 3)
(3, 2)
(4, 2)
data 2:
apple
banana
kiwi
orange
strawberry
map2 сообщит положение элемента, и его выходные данные будут:
(1, apple)
(2, banana)
(3, kiwi)
(4, orange)
(5, strawberry)
мне нужно соединить две карты с выводом следующим образом:
(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)
Я могу использовать только org.apache.spark.SparkConf и org.apache.spark.SparkCotext.Вот код, который я использую до сих пор:
val sc = new SparkContext (conf)
val data1 = sc.textFile("input.txt")
val map1 = data1.map(x => x.split(":")(0), x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((y,_)}
.filter(_._2.nonEmpty).sortByKey().countByKey()
val data2 = sc.textFile("input2.txt")
val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
.map(pair => pair.swap)
val merge_map = map2.join(map1)
Я хочу объединить две карты, которые я сделал, но это выдает ошибку вроде этого:
type mismatch;
found: scala.collection.Map[String, Long]
required: org.apache.spark.rdd.RDD[(Long,?)]
Я думал, может бытьМне нужно изменить тип значений в map1 / map2.Есть идеи как это сделать?Спасибо!
СЛЕДУЙТЕ ЗА ВОПРОСАМИ :
Теперь мне нужно создать map3 с теми же данными, которые вычисляли бы вхождение каждого значения в правую часть после ":».И снова присоединитесь к карте 2. Вот выходные данные map3 и результат соединения, который мне нужен для map3 и map2.
output map3:
(1,1)
(2,3)
(3,2)
(4,2)
join map2 & map3:
(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)
Вот код, который я использую:
val map3 = data1.map(x => x.split(":")(0).toLong, x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((_,1)}.reduceByKey(_+_)
val merge_map23 = map2.leftOuterJoin(map3)
Я получил ошибку:
type mismatch;
found: org.apache.spark.rdd.RDD[String, Long]
required: org.apache.spark.rdd.RDD[(Long,?)]
Я уже исправил предыдущий код с ответами ниже, но теперь яполучил эту ошибку.Спасибо