проблема при использовании функции фильтра scala в rdd - PullRequest
1 голос
/ 02 октября 2019

Я начал изучать скалы и Apache Spark. У меня есть входной файл, как показано ниже без заголовка.

0,name1,33,385 - first record

1,name2,26,221 - second record

unique-id, name, age, friends

1) при попытке отфильтровать возраст, который не равен 26, приведенный ниже код не работает.

def parseLine(x : String) =
  {
    val line = x.split(",").filter(x => x._2 != "26")

  }

Я такжепопробовал как ниже. в обоих случаях он печатает все значения, включая 26

val friends = line(2).filter(x => x != "26")

2), при попытке с индексом x._3 он говорит, что индекс является исходящим.

val line = x.split(",").filter(x => x._3 != "221")

Почему с индексом 3 здесь возникает проблема?

Ниже приведен полный пример кода.

package learning

import org.apache.spark._
import org.apache.log4j._

object Test1 {
  def main(args : Array[String]): Unit =
  {

   val sc = new SparkContext("local[*]", "Test1")
   val lines = sc.textFile("D:\\SparkScala\\abcd.csv")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val testres = lines.map(parseLine)
    testres.take(10).foreach(println)


  }
  def parseLine(x : String) =
  {
    val line = x.split(",").filter(x => x._2 != "33")
    //val line = x.split(",").filter(x => x._3 != "307")
    val age = line(1)
    val friends = line(3).filter(x => x != "307")
    (age,friends)

  }


}

как отфильтровать по возрасту или друзьям по простомупуть сюда. почему индекс 3 не работает здесь

1 Ответ

1 голос
/ 02 октября 2019

Проблема заключается в том, что вы пытаетесь фильтровать массив, представляющий одну строку, а не RDD, содержащий все строки. Возможная версия может быть следующей (я также создал класс case для хранения данных, поступающих из CSV):

package learning

import org.apache.spark._
import org.apache.log4j._

object Test2 {

  // A structured representation of a CSV line
  case class Person(id: String, name: String, age: Int, friends: Int)

  def main(args : Array[String]): Unit = {

   val sc = new SparkContext("local[*]", "Test1")
   Logger.getLogger("org").setLevel(Level.ERROR)

   sc.textFile("D:\\SparkScala\\abcd.csv") // RDD[String]
     .map(line => parse(line)) // RDD[Person]
     .filter(person => person.age != 26) // filter out people of 26 years old
     .take(10) // collect 10 people from the RDD
     .foreach(println)

  }

  def parse(x : String): Person = {
    // Split the CSV string by comma into an array of strings
    val line = x.split(",")

    // After extracting the fields from the CSV string, create an instance of Person
    Person(id = line(0), name = line(1), age = line(2).toInt, friends = line(3).toInt)
  }
}

Другая возможность - использовать значения flatMap() и Option[]. В этом случае вы можете работать с одной линией напрямую, например:

package learning

import org.apache.spark._
import org.apache.log4j._

object Test3 {

  // A structured representation of a CSV line
  case class Person(id: String, name: String, age: Int, friends: Int)

  def main(args : Array[String]): Unit = {

   val sc = new SparkContext("local[*]", "Test1")
   Logger.getLogger("org").setLevel(Level.ERROR)

   sc.textFile("D:\\SparkScala\\abcd.csv") // RDD[String]
     .flatMap(line => parse(line)) // RDD[Person] -- you don't need to filter anymore, the flatMap does it for you now
     .take(10) // collect 10 people from the RDD
     .foreach(println)

  }

  def parse(x : String): Option[Person] = {
    // Split the CSV string by comma into an array of strings
    val line = x.split(",")

    // After extracting the fields from the CSV string, create an instance of Person only if it's not 26
    line(2) match {
      case "26" => None
      case _ => Some(Person(id = line(0), name = line(1), age = line(2).toInt, friends = line(3).toInt))
    } 
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...