Как передать данные в виде кортежа в rdd в Spark с помощью Scala - PullRequest
0 голосов
/ 07 января 2019

У меня есть набор координат (x, y) в качестве моих данных в CSV-файле. Я хочу передать эти x и y в RDD [(Double, Double)] как кортежи и назвать его точками. Я пробовал следующее, но по какой-то причине я получаю сообщение об ошибке. «Конструктор не может быть создан для ожидаемого типа, найдено: Array [T], требуется: String».

// Load the data
val data = sc.textFile("data.csv")

// Read the data as an RDD[(Double, Double)]
val points = data.map(line => line.split(",").map{ case Array(x, y) => (x.toDouble, y.toDouble)} )

РЕДАКТИРОВАТЬ: Есть ли способ, которым я могу отфильтровать эти точки, чтобы я мог обрабатывать значения, которые являются нулевыми (если x или y или оба равны нулю в наборе данных)? По сути, я хочу проверить, всегда ли кортеж содержит 2 элемента. Я пробовал что-то вроде этого

val points = data.map(line => line.split(",").filter(!_.isEmpty)).map{ case Array(x, y) => (x.toDouble, y.toDouble)}.filter(_.size > 1)

но я получаю ошибку Несоответствие типов, ожидаемое: (Double, Double) => Boolean, фактическое: (Double, Double) => Любое

Ответы [ 3 ]

0 голосов
/ 07 января 2019

Ваш подход почти правильный, но вы должны использовать:

val points = data.map(line => {
  val Array(x, y) = line.split(",")
  (x.toDouble, y.toDouble)
})

Или альтернативно:

val points = data.map(line => {
  line.split(",") match {
    case Array(x, y) => (x.toDouble, y.toDouble)
  }
})

Проблема вашего подхода в том, что вы звоните map на line.split(","), то есть вы вызываете map на Àrray[String], поэтому вы пытаетесь (шаблон) сопоставить String с Array(x,y)

0 голосов
/ 07 января 2019

Apache spark имеет API для чтения CSV-файла. Я предпочитаю использовать API, а не textFile для чтения CSV-файла, поскольку он обрабатывает пропущенные значения или NULL внутри. Вот содержимое моего data.csv файла:

12,13
12.3,25.6
12.4
,34.5

Требуемый вывод может быть сгенерирован следующим образом:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}

val scheam = StructType(Array(
      StructField("x",DoubleType,true),
      StructField("y",DoubleType,true)
    ))
val data_df = spark.read.schema(scheam).csv("data.csv")
data_df.show()
+----+----+
|   x|   y|
+----+----+
|12.0|13.0|
|12.3|25.6|
|12.4|null|
|null|34.5|
//converting the data_df dataframe to RDD[Double,Double]
val points_rdd = data_df.rdd.map{case Row(x:Double,y:Double) => (x,y)}

Обработка ноль:

val filterd_data_df = data_df.filter(data_df("x").isNotNull && data_df("y").isNotNull).
                rdd.map{case Row(x:Double,y:Double) => (x,y)}
import spark.implicits._
filterd_data_df.toDF("x", "y").show()
+----+----+
|   x|   y|
+----+----+
|12.0|13.0|
|12.3|25.6|
+----+----+
0 голосов
/ 07 января 2019

Используйте код ниже. Вы должны вызвать вторую карту на выходе split, то есть список массива

// Load the data
      val data = sc.textFile("data.csv")

      // Read the data as an RDD[(Double, Double)]
      val points = data.map(line => line.split(",")).map{ case Array(x, y) => (x.toDouble, y.toDouble)}
...