Поскольку у нас есть RDD[Fruit]
, любая строка может быть либо Apple
, либо Mango
. При использовании foreachRDD
каждый RDD
будет содержать смесь этих (и, возможно, других) типов.
Чтобы различать различные типы, мы можем использовать collect[U](f: PartialFunction[T, U]): RDD[U]
(это не следует путать с collect(): Array[T]
, который возвращает список с элементами из RDD). Эта функция вернет СДР, который содержит все совпадающие значения, применяя функцию f
(в этом случае мы можем использовать сопоставление с шаблоном здесь).
Ниже приведен небольшой иллюстративный пример (добавление Orange
к фрукты тоже).
Настройка:
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val dStream: InputDStream[Fruit] = ssc.queueStream(inputData)
inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))
Создается поток RDD[Fruit]
с двумя отдельными RDD
с.
dStream.foreachRDD{rdd: RDD[Fruit] =>
val mix = rdd.collect{
case row: Apple => ("APPLE", row.price) // do any computation on apple rows
case row: Mango => ("MANGO", row.price) // do any computation on mango rows
//case _@row => do something with other rows (will be removed by default).
}
mix foreach println
}
В приведенном выше collect
мы слегка измените каждую строку (удалив класс), а затем напечатайте получившийся RDD
. Результат:
// First RDD
(MANGO,11)
(APPLE,5)
(APPLE,5)
// Second RDD
(MANGO,10)
Как видно, при сопоставлении с образцом строки, содержащие Apple
и Mango
, были сохранены и изменены при удалении всех классов Orange
.
Разделение СДР
При желании также можно разделить два подкласса на их собственные RDD
с, как указано ниже. Затем можно выполнить любые вычисления на этих двух RDD
с.
val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}
Полный пример кода
trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends Fruit
case class Orange(price:Int) extends Fruit
object Test {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local[*]").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val inputStream: InputDStream[Fruit] = ssc.queueStream(inputData)
inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))
inputStream.foreachRDD{rdd:RDD[Fruit] =>
val mix = rdd.collect{
case row: Apple => ("APPLE", row.price) // do any computation on apple rows
case row: Mango => ("MANGO", row.price) // do any computation on mango rows
//case _@row => do something with other rows (will be removed by default).
}
mix foreach println
}
ssc.start()
ssc.awaitTermination()
}
}