Как выполнить модульное тестирование на Spark Structured Streaming? - PullRequest
0 голосов
/ 04 июля 2019

Я хотел бы знать о модульном тестировании Spark Structured Streaming. Мой сценарий заключается в том, что я получаю данные от Kafka и использую их с использованием Spark Structured Streaming и применяю некоторые преобразования поверх данных.

Я не уверен, как я могу проверить это с помощью Scala и Spark. Может кто-нибудь сказать мне, как сделать модульное тестирование в структурированном потоке с использованием Scala. Я новичок в потоковой передаче.

1 Ответ

2 голосов
/ 06 июля 2019

tl; dr Используйте MemoryStream для добавления событий и памяти для вывода.

Следующий код должен помочь начать:

import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")

// use sessions event stream to apply required transformations
val transformedSessions = ...

val streamingQuery = transformedSessions
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(queryOutputMode)
  .start

// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
  eventGen.generate(userId = 1, offset = 1.second),
  eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])

// check the output
// The output is in queryName table
// The following code simply shows the result
spark
  .table(queryName)
  .show(truncate = false)
...