Разделить строку дважды и уменьшитьByKey в Scala - PullRequest
0 голосов
/ 19 сентября 2019

У меня есть файл .csv, который я пытаюсь проанализировать с помощью spark.Файл .csv содержит, помимо прочего, список тем и их количество.Темы и их количество разделены символом «,», и все эти темы + счетчики находятся в одной строке, разделенные символом «;»вот так

"topic_1,10;topic_2,12;topic_1,3"

Как видите, некоторые темы находятся в строке несколько раз.

У меня есть rdd, содержащий пары значений ключа некоторой даты и строки темы [date, topicstring]

Что я хочу сделать, это разбить строку на ';'чтобы получить все отдельные темы, затем разделите их на «,» и создайте пару значений ключа из названия и количества тем, которые можно уменьшить на ключ.Для приведенного выше примера это будет

[date, ((topic_1, 13), (topic_2, 12))]

Так что я много играл в Spark, поскольку я новичок в Scala.Я пытался сделать следующее:

val separateTopicsByDate = topicsByDate
  .mapValues(_.split(";").map({case(str) => (str)}))
  .mapValues({case(arr) => arr
    .filter(str => str.split(",").length > 1)
    .map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
  })

Проблема в том, что это возвращает массив кортежей, который я не могу уменьшитьByKey.Когда я разделяю строку на ';'это возвращает массив.Я попытался отобразить это в кортеж (как вы можете видеть из операции map), но это не работает.

Полный код, который я использовал,

val rdd = sc.textFile("./data/segment/*.csv")

val topicsByDate = rdd
  .filter(line => line.split("\t").length > 23)
  .map({case(str) => (str.split("\t")(1), str.split("\t")(23))})
  .reduceByKey(_ + _)

val separateTopicsByDate = topicsByDate
  .mapValues(_.split(";").map({case(str) => (str)}))
  .mapValues({case(arr) => arr
    .filter(str => str.split(",").length > 1)
    .map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
  })

separateTopicsByDate.take(2)

Возвращает

res42: Array[(String, Array[(String, Int)])] = Array((20150219001500,Array((Cecilia Pedraza,91), (Mexico City,110), (Soviet Union,1019), (Dutch Warmbloods,1236), (Jose Luis Vaquero,1413), (National Equestrian Club,1636), (Lenin Park,1776), (Royal Dutch Sport Horse,2075), (North American,2104), (Western Hemisphere,2246), (Maydet Vega,2800), (Mirach Capital Group,110), (Subrata Roy,403), (New York,820), (Saransh Sharma,945), (Federal Bureau,1440), (San Francisco,1482), (Gregory Wuthrich,1635), (San Francisco,1652), (Dan Levine,2309), (Emily Flitter,2327), (K...

Как вы можете видеть, это массив кортежей, который я не могу использовать .reduceByKey (_ + _).

Есть ли способ разбить строку таким образом, чтобы она могла бытьуменьшается на ключ?

1 Ответ

0 голосов
/ 20 сентября 2019

В случае, если в вашем СДР есть строки, такие как:

[date, "topic1,10;topic2,12;topic1,3"]  

, вы можете разделить значения и разбить строку, используя flatMap на:

[date, ["topic1,10", "topic2,12", "topic1,3"]] ->

[date, "topic1,10"]  
[date, "topic2,12"]  
[date, "topic1,3"]

Затем преобразовать каждую строку в[String, Integer] Tuple (rdd1 в коде):

["date_topic1",10]  
["date_topic2",12]  
["date_topic1",3]

и уменьшите ключом, используя сложение (rdd2 в коде):

["date_topic1",13]  
["date_topic2",12]  

Тогда выотделяйте даты от тем и объединяйте темы со значениями, получая кортежи [String, String], например:

["date", "topic1,13"]  
["date", "topic2,12"]  

Наконец, вы разделяете значения на кортежи [topic, count], готовите пары ["date", [(topic,count)]] (rdd3)в коде) и уменьшите ключом (rdd4 в коде), получив:

["date", [(topic1, 13), (topic2, 12)]]

===
ниже приведена реализация Java в виде последовательности из четырех промежуточных СДР, вы можете обратиться кэто для разработки Scala:

    JavaPairRDD<String, String> rdd;     //original data. contains [date, "topic1,10;topic2,12;topic1,3"] 

    JavaPairRDD<String, Integer> rdd1 =  //contains
                                         //["date_topic1",10]  
                                         //["date_topic2",12]  
                                         //["date_topic1",3]


            rdd.flatMapToPair(

                pair -> //pair=[date, "topic1,10;topic2,12;topic1,3"]
                {

                    List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();

                    String k = pair._1; //date
                    String v = pair._2; //"topic,count;topic,count;topic,count"

                    String[] v_splits = v.split(";");

                    for(int i=0; i<v_splits.length; i++)
                    {
                        String[] v_split_topic_count = v_splits[i].split(",");  //"topic,count"

                        list.add(new Tuple2<String,Integer>(k + "_" + v_split_topic_count[0], Integer.parseInt(v_split_topic_count[1]))); //"date_topic,count"
                    }

                    return list.iterator();
                }//end call

            );


    JavaPairRDD<String,Integer> rdd2 = //contains
                                       //["date_topic1",13]  
                                       //["date_topic2",12]  


           rdd1.reduceByKey((Integer i1, Integer i2) -> i1+i2);     


    JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd3 = //contains
                                                                //["date", [(topic1,13)]]  
                                                                //["date", [(topic2,12)]]  

           rdd2.mapToPair(

                pair -> //["date_topic1",13]
                {
                    String k  = pair._1; //date_topic1
                    Integer v = pair._2; //13


                    String[] dateTopicSplits = k.split("_");

                    String new_k = dateTopicSplits[0]; //date                    

                    List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
                    list.add(new Tuple2<String,Integer>(dateTopicSplits[1], v)); //[(topic1,13)]

                    return new Tuple2<String,Iterator<Tuple2<String,Integer>>>(new_k, list.iterator());
                }

           );

    JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd4 = //contains
                                                                //["date", [(topic1, 13), (topic2, 12)]]

            rdd3.reduceByKey(

            (Iterator<Tuple2<String,Integer>> itr1, Iterator<Tuple2<String,Integer>> itr2) ->
            {
               List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();

               while(itr1.hasNext())
                     list.add(itr1.next());

               while(itr2.hasNext())
                     list.add(itr2.next());

               return list.iterator();
            }

            );

UPD.Эту проблему на самом деле можно решить, используя только один map - вы разделяете значение строки (то есть themestring) на ;, так что вы получаете пары [ключ, значение] как [topic, count], и вы заполняете hashmap с помощьюэти пары складывают счет.Наконец, вы выводите ключ date со всеми различными ключами, накопленными в хэш-карте вместе с их значениями.
Этот способ также представляется более эффективным, поскольку размер хеш-карты не будет больше размера оригинала.строка, поэтому пространство памяти, используемое картографом, будет ограничено размером самой большой строки, тогда как в решении для плоской карты память должна быть достаточно большой, чтобы вместить все эти расширенные строки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...