В случае, если в вашем СДР есть строки, такие как:
[date, "topic1,10;topic2,12;topic1,3"]
, вы можете разделить значения и разбить строку, используя flatMap
на:
[date, ["topic1,10", "topic2,12", "topic1,3"]] ->
[date, "topic1,10"]
[date, "topic2,12"]
[date, "topic1,3"]
Затем преобразовать каждую строку в[String, Integer] Tuple (rdd1
в коде):
["date_topic1",10]
["date_topic2",12]
["date_topic1",3]
и уменьшите ключом, используя сложение (rdd2
в коде):
["date_topic1",13]
["date_topic2",12]
Тогда выотделяйте даты от тем и объединяйте темы со значениями, получая кортежи [String, String], например:
["date", "topic1,13"]
["date", "topic2,12"]
Наконец, вы разделяете значения на кортежи [topic, count], готовите пары ["date", [(topic,count)]]
(rdd3
)в коде) и уменьшите ключом (rdd4
в коде), получив:
["date", [(topic1, 13), (topic2, 12)]]
===
ниже приведена реализация Java в виде последовательности из четырех промежуточных СДР, вы можете обратиться кэто для разработки Scala:
JavaPairRDD<String, String> rdd; //original data. contains [date, "topic1,10;topic2,12;topic1,3"]
JavaPairRDD<String, Integer> rdd1 = //contains
//["date_topic1",10]
//["date_topic2",12]
//["date_topic1",3]
rdd.flatMapToPair(
pair -> //pair=[date, "topic1,10;topic2,12;topic1,3"]
{
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
String k = pair._1; //date
String v = pair._2; //"topic,count;topic,count;topic,count"
String[] v_splits = v.split(";");
for(int i=0; i<v_splits.length; i++)
{
String[] v_split_topic_count = v_splits[i].split(","); //"topic,count"
list.add(new Tuple2<String,Integer>(k + "_" + v_split_topic_count[0], Integer.parseInt(v_split_topic_count[1]))); //"date_topic,count"
}
return list.iterator();
}//end call
);
JavaPairRDD<String,Integer> rdd2 = //contains
//["date_topic1",13]
//["date_topic2",12]
rdd1.reduceByKey((Integer i1, Integer i2) -> i1+i2);
JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd3 = //contains
//["date", [(topic1,13)]]
//["date", [(topic2,12)]]
rdd2.mapToPair(
pair -> //["date_topic1",13]
{
String k = pair._1; //date_topic1
Integer v = pair._2; //13
String[] dateTopicSplits = k.split("_");
String new_k = dateTopicSplits[0]; //date
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
list.add(new Tuple2<String,Integer>(dateTopicSplits[1], v)); //[(topic1,13)]
return new Tuple2<String,Iterator<Tuple2<String,Integer>>>(new_k, list.iterator());
}
);
JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd4 = //contains
//["date", [(topic1, 13), (topic2, 12)]]
rdd3.reduceByKey(
(Iterator<Tuple2<String,Integer>> itr1, Iterator<Tuple2<String,Integer>> itr2) ->
{
List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
while(itr1.hasNext())
list.add(itr1.next());
while(itr2.hasNext())
list.add(itr2.next());
return list.iterator();
}
);
UPD.Эту проблему на самом деле можно решить, используя только один map
- вы разделяете значение строки (то есть themestring) на ;
, так что вы получаете пары [ключ, значение] как [topic, count], и вы заполняете hashmap с помощьюэти пары складывают счет.Наконец, вы выводите ключ date
со всеми различными ключами, накопленными в хэш-карте вместе с их значениями.
Этот способ также представляется более эффективным, поскольку размер хеш-карты не будет больше размера оригинала.строка, поэтому пространство памяти, используемое картографом, будет ограничено размером самой большой строки, тогда как в решении для плоской карты память должна быть достаточно большой, чтобы вместить все эти расширенные строки