Искра Scala совпадает с ошибкой при сравнении значений между двумя парами ключ-значение - PullRequest
0 голосов
/ 18 марта 2020

Моя цель - прочитать 2 файла, отфильтровать стоп-слова, найти общие слова и выбрать меньшее количество слов между этими двумя файлами. После этого я должен отсортировать пары ключ-значение в порядке убывания и показать только первые 15 слов. Поэтому я решил сделать это так:

    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("common words")

    val sc = new SparkContext(conf)

    val stopwords_file = sc.textFile("files/stopwords.txt")
    val file = sc.textFile("files/task1-input1.txt")
    val file2 = sc.textFile("files/task1-input2.txt")

    val stopwords = stopwords_file.collect()

    val counts1 = file.flatMap(line => line.split(" "))
      .map(word => word.toLowerCase())
      .filter(!stopwords.contains(_))
      .map(word => (word, 1))
      .reduceByKey(_ + _)

    val counts2 = file2.flatMap(line => line.split(" "))
      .map(word => word.toLowerCase())
      .filter(!stopwords.contains(_))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .join(counts1)
      .collect()
      .map{ //this line gives the error
        case (k, (v1,v2)) if v1 < v2 => (k,v1) // check if count on file1 is smaller then use file1 count for that word, else use file2 count
      }

    val result = sc.parallelize(counts2).sortBy(_._2, false).take(15)

    result.foreach(println)

это дает мне эту ошибку:

Exception in thread "main" scala.MatchError: (http://www.gutenberg.org,(1,1)) (of class scala.Tuple2) at line 30 (the map by case line)

Я не совсем понимаю ошибку, любая помощь или предложение для альтернативного метода будет быть оцененным Я новичок в спринге, у меня было oop Я бы отобразил каждый файл в разные пары значений ключа, затем превратил их во входные данные для функции сокращения и сравнил значения в функции уменьшения, но я не уверен, что мне следует делать в спарке. Спасибо.

Ответы [ 2 ]

0 голосов
/ 18 марта 2020

На самом деле вы выполнили много ненужных заданий, например, собирали RDD. Сбор RDD собирает все данные, распределенные в кластере, на главную машину.

    JavaRDD<String> a = context.textFile("a.txt");
    JavaRDD<String> b = context.textFile("b.txt");
    Set<String> stopwords = null;
    try {
        stopwords = new HashSet(Files.readAllLines(Paths.get("stopwords.txt")));
    } catch (IOException e) {
        e.printStackTrace();
    }
    Set<String> finalStopwords = stopwords;
    JavaPairRDD<String, Integer> aPair = a
            .flatMapToPair(s -> countToOne(s))
            .filter(s -> !finalStopwords.contains(s._1()))
            .reduceByKey((s1, s2) -> s1 + s2);
    JavaPairRDD<String, Integer> bPair = b
            .flatMapToPair(s -> countToOne(s))
            .filter(s -> !finalStopwords.contains(s._1()))
            .reduceByKey((s1, s2) -> s1 + s2);
    List<Tuple2<String, Integer>> collect = aPair.fullOuterJoin(bPair).mapToPair(s -> {
        Integer integer1 = s._2()._2().orNull();
        Integer integer2 = s._2()._1().orNull();
        if (integer1 == null) {
            return new Tuple2<>(s._1(), integer2.intValue());
        } else if (integer2 == null) {
            return new Tuple2<>(s._1(), integer1.intValue());
        }
        if (integer1.intValue() > integer2.intValue()) {
            return new Tuple2<>(s._1(), integer1.intValue());
        }
        return new Tuple2<>(s._1(), integer2.intValue());
    }).collect();
    for (Tuple2<String, Integer> stringIntegerTuple2 : collect) {
        System.out.println(stringIntegerTuple2._1() + "\t" + stringIntegerTuple2._2());
    }
0 голосов
/ 18 марта 2020

Защитное выражение делает ваш шаблон неисчерпывающим. Рассмотрим:

val f: PartialFunction[(String, (Int, Int)), (String, Int)] = {
  case (k, (v1,v2)) if v1 < v2 => (k,v1) 
}

f.isDefinedAt(("foo", (0, 1)))
// Boolean = true

против.

f.isDefinedAt(("foo", (1, 0)))
// Boolean = false

Вы должны сбросить защитное выражение:

val g: PartialFunction[(String, (Int, Int)), (String, Int)] = {
  case (k, (v1,v2)) => if(v1 < v2) (k,v1)  else (k, v2)
}
g.isDefinedAt(("foo", (1, 0)))
// Boolean = true

g.isDefinedAt(("foo", (0, 1)))
// Boolean = true

или указать значение по умолчанию:

val h: PartialFunction[(String, (Int, Int)), (String, Int)] = {
  case (k, (v1,v2)) if v1 < v2 => (k, v1)
  case (k, (_, v2)) => (k, v2)
}

h.isDefinedAt(("foo", (0, 1)))
// Boolean = true

h.isDefinedAt(("foo", (1, 0)))
// Boolean = true
...