Модульное тестирование преобразования DStream с помощью Spark Streaming - PullRequest
0 голосов
/ 21 сентября 2018

Я пытаюсь выполнить тестовые случаи из spark-testing-base , я пытаюсь выполнить этот тест, но по какой-то причине я не могу заставить его работать, потому что агрегация не происходит.Не уверен, как я могу получить весь DStream для метода испытаний.Любое предложение приветствуется.

import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.streaming.dstream.DStream
import org.scalatest.FunSuite

case class Person(name: String, mark: Int) extends Serializable

case class Total(name: String, total: Int) extends Serializable

class SampleStreamingTest3 extends FunSuite with StreamingSuiteBase  {


  test("simple test") {
    val input = List(List(Person("Mark", 200)),
                     List(Person("Mark", 300)),
                     List(Person("Mark", 400)))
    val expected = Array(List(Total("Mark",900)))

    testOperation[Person, Total](input, Test.convertPersonToMarks _, expected, ordered = false)

  }
}

object Test extends Serializable {
  def convertPersonToMarks(input: DStream[Person]): DStream[Total] = {
    val sparkSession = SparkSession.builder().appName("udf testings")
      .master("local[*]")
      .getOrCreate()

    import sparkSession.implicits._

    val output = input.transform { rdd =>
      val df = rdd.toDF()

      //df.select($"name",$"mark".as("total")).as[Total].rdd
      val output = df.groupBy("name").agg(sum("mark").cast("Int").alias("total")).as[Total]

      output.rdd
    }

   output.print()

    output

  }
}
...