Я пытаюсь написать модульные тесты в Spark Streaming с DStreams.Вот мой тестовый пример.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.specs2.Specification
import org.specs2.specification.core.SpecStructure
import scala.collection.mutable
case class Visit(userId: Long, page: String, duration: Long) extends Serializable
class MapWithStateTest extends Specification with Serializable {
override def is: SpecStructure = "testing aggregation of Hcom_ClickstreamHit DStream"
val slideDuration = Milliseconds(100)
val conf = new SparkConf()
@transient val sc = new SparkContext(master = "local[*]", appName = "test1", conf = conf)
sc.setLogLevel("Error")
@transient val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val queueOfRDDs = mutable.Queue[RDD[Visit]]()
@transient val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)
val rddFloodOfQueuesDS: DStream[Visit] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)
//rddFloodOfQueuesDS.foreachRDD(rdd => rdd.foreach(println))
val visits = Seq(
Visit(1, "home.html", 10),
Visit(2, "cart.html", 5),
Visit(1, "home.html", 10),
Visit(2, "address/shipping.html", 10),
Visit(2, "address/billing.html", 10)
)
visits.foreach(visit => queueOfRDDs += streamingContext.sparkContext.makeRDD(Seq(visit)))
val output = streamingContext.queueStream(queueOfRDDs)
.map(visit => (visit.userId, visit))
.mapWithState(StateSpec.function(TransformFunctions.handleVisit _)
.timeout(Durations.seconds(1)))
println(output.count())
output.count() must_==(2)
streamingContext.checkpoint("/tmp/")
streamingContext.start()
streamingContext.awaitTerminationOrTimeout(10000)
}
Здесь находится вспомогательный объект (создайте отдельно)
import org.apache.spark.streaming.State
object TransformFunctions extends Serializable {
def handleVisit(key: Long, visit: Option[Visit], state: State[Long]): Option[Any] = {
(visit, state.getOption()) match {
case (Some(newVisit), None) => {
// the 1st visit
state.update(newVisit.duration)
None
}
case (Some(newVisit), Some(totalDuration)) => {
// next visit
state.update(totalDuration + newVisit.duration)
None
}
case (None, Some(totalDuration)) => {
// last state - timeout occurred and passed
// value is None in this case
Some(key, totalDuration)
}
case _ => None
}
}
}
Я не получаю сообщение об ошибке, но получаю это сообщение Пустой набор тестов.Пожалуйста, дайте мне знать, если я что-то упустил.Также, пожалуйста, опубликуйте простой пример модульного теста с DStreams.Или укажите мне на любой блог.