Модульные тесты в Spark Streaming - PullRequest
0 голосов
/ 19 сентября 2018

Я пытаюсь написать модульные тесты в Spark Streaming с DStreams.Вот мой тестовый пример.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.specs2.Specification
import org.specs2.specification.core.SpecStructure

import scala.collection.mutable

case class Visit(userId: Long, page: String, duration: Long) extends Serializable

class MapWithStateTest extends Specification with Serializable {
  override def is: SpecStructure = "testing aggregation of Hcom_ClickstreamHit DStream"


  val slideDuration = Milliseconds(100)
  val conf = new SparkConf()
  @transient val sc = new SparkContext(master = "local[*]", appName = "test1", conf = conf)
  sc.setLogLevel("Error")
  @transient val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()

  val sparkContext: SparkContext = sparkSession.sparkContext
  val queueOfRDDs = mutable.Queue[RDD[Visit]]()

  @transient val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)


  val rddFloodOfQueuesDS: DStream[Visit] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)

  //rddFloodOfQueuesDS.foreachRDD(rdd => rdd.foreach(println))

  val visits = Seq(
    Visit(1, "home.html", 10),
    Visit(2, "cart.html", 5),
    Visit(1, "home.html", 10),
    Visit(2, "address/shipping.html", 10),
    Visit(2, "address/billing.html", 10)
  )

  visits.foreach(visit => queueOfRDDs += streamingContext.sparkContext.makeRDD(Seq(visit)))

  val output = streamingContext.queueStream(queueOfRDDs)
    .map(visit => (visit.userId, visit))
    .mapWithState(StateSpec.function(TransformFunctions.handleVisit _)
      .timeout(Durations.seconds(1)))

  println(output.count())
  output.count() must_==(2)

  streamingContext.checkpoint("/tmp/")
  streamingContext.start()
  streamingContext.awaitTerminationOrTimeout(10000)

}

Здесь находится вспомогательный объект (создайте отдельно)

import org.apache.spark.streaming.State

object TransformFunctions  extends Serializable  {

  def handleVisit(key: Long, visit: Option[Visit], state: State[Long]): Option[Any] = {
    (visit, state.getOption()) match {
      case (Some(newVisit), None) => {
        // the 1st visit
        state.update(newVisit.duration)
        None
      }
      case (Some(newVisit), Some(totalDuration)) => {
        // next visit
        state.update(totalDuration + newVisit.duration)
        None
      }
      case (None, Some(totalDuration)) => {
        // last state - timeout occurred and passed
        // value is None in this case
        Some(key, totalDuration)
      }
      case _ => None
    }
  }

}

Я не получаю сообщение об ошибке, но получаю это сообщение Пустой набор тестов.Пожалуйста, дайте мне знать, если я что-то упустил.Также, пожалуйста, опубликуйте простой пример модульного теста с DStreams.Или укажите мне на любой блог.

...