Как отфильтровать rdd по типу данных? - PullRequest
0 голосов
/ 26 января 2019

У меня есть rdd, который я пытаюсь отфильтровать только для типа с плавающей точкой.Spark rdds предоставляет какой-либо способ сделать это?

У меня есть CSV, где мне нужно только плавающие значения больше 40 в новый rdd.Чтобы добиться этого, я проверяю, является ли это экземпляром типа float, и фильтрую их.Когда я фильтрую с !, все строки все еще присутствуют на выходе, а когда я не использую !, вывод будет пустым.

val airports1 = airports.filter(line => !line.split(",")(6).isInstanceOf[Float])
val airports2 = airports1.filter(line => line.split(",")(6).toFloat > 40)

На .toFloat я сталкиваюсь сNumberFormatException, который я пытался обработать в блоке try catch.

1 Ответ

0 голосов
/ 26 января 2019

Поскольку у вас есть простая строка и вы пытаетесь получить из нее значения с плавающей точкой, вы фактически не фильтруете по типу. Но, если они могут быть проанализированы, чтобы плавать вместо этого.
Вы можете сделать это, используя flatMap вместе с Option.

import org.apache.spark.sql.SparkSession
import scala.util.Try

val spark = SparkSession.builder.master("local[*]").appName("Float caster").getOrCreate()
val sc = spark.sparkContext

val data = List("x,10", "y,3.3", "z,a")
val rdd = sc.parallelize(data) // rdd: RDD[String]
val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption) // filtered: RDD[Float]
filtered.collect() // res0: Array[Float] = Array(10.0, 3.3)

Для партии > 40 вы можете либо выполнить другой фильтр после, либо отфильтровать внутренний Option.
(Оба должны выполнять более или менее равные из-за лени искры, поэтому выберите тот, который более понятен для вас) .

// Option 1 - Another filter.
val filtered2 = filtered.filter(x => x > 40)

// Option 2 - Filter the inner option in one step.
val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption.filter(x => x > 40))

Дайте мне знать, если у вас есть какие-либо вопросы.

...