Я пытаюсь выполнить тестовые случаи из spark-testing-base , я пытаюсь выполнить этот тест, но по какой-то причине я не могу заставить его работать, потому что агрегация не происходит.Не уверен, как я могу получить весь DStream для метода испытаний.Любое предложение приветствуется.
import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.streaming.dstream.DStream
import org.scalatest.FunSuite
case class Person(name: String, mark: Int) extends Serializable
case class Total(name: String, total: Int) extends Serializable
class SampleStreamingTest3 extends FunSuite with StreamingSuiteBase {
test("simple test") {
val input = List(List(Person("Mark", 200)),
List(Person("Mark", 300)),
List(Person("Mark", 400)))
val expected = Array(List(Total("Mark",900)))
testOperation[Person, Total](input, Test.convertPersonToMarks _, expected, ordered = false)
}
}
object Test extends Serializable {
def convertPersonToMarks(input: DStream[Person]): DStream[Total] = {
val sparkSession = SparkSession.builder().appName("udf testings")
.master("local[*]")
.getOrCreate()
import sparkSession.implicits._
val output = input.transform { rdd =>
val df = rdd.toDF()
//df.select($"name",$"mark".as("total")).as[Total].rdd
val output = df.groupBy("name").agg(sum("mark").cast("Int").alias("total")).as[Total]
output.rdd
}
output.print()
output
}
}