Как сопоставить RDD [ParentClass] с RDD [Subclass] в apache spark? - PullRequest
3 голосов
/ 26 апреля 2020

Я должен сопоставить rdd с его типами.

trait Fruit

case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit

Теперь идет поток данных типа DStream[Fruit]. Это либо Apple, либо Mango.

Как выполнить операцию на основе подкласса? Что-то вроде ниже (что не работает):

dStream.foreachRDD{rdd:RDD[Fruit] =>
     rdd match {
       case rdd: RDD[Apple] =>
         //do something

       case rdd: RDD[Mango] =>
         //do something

       case _ =>
         println(rdd.count() + "<<<< not matched anything")
     }
    }

1 Ответ

2 голосов
/ 28 апреля 2020

Поскольку у нас есть RDD[Fruit], любая строка может быть либо Apple, либо Mango. При использовании foreachRDD каждый RDD будет содержать смесь этих (и, возможно, других) типов.

Чтобы различать различные типы, мы можем использовать collect[U](f: PartialFunction[T, U]): RDD[U] (это не следует путать с collect(): Array[T], который возвращает список с элементами из RDD). Эта функция вернет СДР, который содержит все совпадающие значения, применяя функцию f (в этом случае мы можем использовать сопоставление с шаблоном здесь).

Ниже приведен небольшой иллюстративный пример (добавление Orange к фрукты тоже).

Настройка:

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val dStream: InputDStream[Fruit] = ssc.queueStream(inputData)

inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

Создается поток RDD[Fruit] с двумя отдельными RDD с.

dStream.foreachRDD{rdd: RDD[Fruit] =>
  val mix = rdd.collect{
    case row: Apple => ("APPLE", row.price) // do any computation on apple rows
    case row: Mango => ("MANGO", row.price) // do any computation on mango rows
    //case _@row => do something with other rows (will be removed by default).
  }
  mix foreach println
}

В приведенном выше collect мы слегка измените каждую строку (удалив класс), а затем напечатайте получившийся RDD. Результат:

// First RDD
(MANGO,11)
(APPLE,5)
(APPLE,5)

// Second RDD
(MANGO,10)

Как видно, при сопоставлении с образцом строки, содержащие Apple и Mango, были сохранены и изменены при удалении всех классов Orange.


Разделение СДР

При желании также можно разделить два подкласса на их собственные RDD с, как указано ниже. Затем можно выполнить любые вычисления на этих двух RDD с.

val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}

Полный пример кода

trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit
case class Orange(price:Int) extends  Fruit

object Test {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[*]").getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    val inputData: Queue[RDD[Fruit]] = Queue()
    val inputStream: InputDStream[Fruit] = ssc.queueStream(inputData)

    inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
    inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))

    inputStream.foreachRDD{rdd:RDD[Fruit] =>
      val mix = rdd.collect{
        case row: Apple => ("APPLE", row.price) // do any computation on apple rows
        case row: Mango => ("MANGO", row.price) // do any computation on mango rows
        //case _@row => do something with other rows (will be removed by default).
      }
      mix foreach println
    }

    ssc.start()
    ssc.awaitTermination()
  }
}
...