Я пытаюсь написать несколько тестов для написанного мною задания Spark и сталкиваюсь с ошибкой, когда пытаюсь утверждать.
Я сделал следующее (извините за стену кода; я не знаком со Spark и тем, что есть и не нужно тестировать, и это моя первая попытка =]).
- Вычеркни немного логики из моей работы в Spark.
def transform_stream_test(spark: SparkSession, stream: Dataset[Row], topic: String) = {
import spark.implicits._
stream
.select(col("key").cast(StringType).as("memberId"), unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss").cast(TimestampType).as("publishDate"), col("value").as[Array[Byte]].as("value"))
.map { entity =>
val schemaRegUrl = //some url
val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegUrl, 100)
val avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient).asInstanceOf[Deserializer[GenericRecord]]
PlanMember(
memberId = entity.getAs[String]("memberId"),
publishDate = entity.getAs[java.sql.Timestamp]("publishDate"),
record = Option(entity.getAs[Array[Byte]]("value")).map(p => avroDeserializer.deserialize(topic, p)))
}
.where(!col("member_id").startsWith("User") and !col("member_id").startsWith("user"))
}
Макет
MemoryStream
типа
PlanMember
для перехода в этот метод.
def createStreamingDataFrame(planMember: PlanMember) : DataFrame = {
val input = new MemoryStream[PlanMember](42, spark.sqlContext)
input.addData(Seq(planMember))
input
.toDS()
.map { m =>
val planMemberTopic = "sometopic"
. . . // some logic to register avro
val serializedPlanMember = avroSerializer.serialize(planMemberTopic, avroRecord)
val timestamp = dateFormat.format(m.event_date)
(m.member_id, timestamp, serializedPlanMember)
}.toDF("key", "timestamp", "value")
}
Написана функция процесса
def process(stream: Dataset[PlanMember]) = {
stream
.writeStream
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append)
.start()
.processAllAvailable()
}
Написан тест
test("testing main stream datasets") {
//arrange
val planMember = createPlanMember("user", "2018-07-09T09:45:50", "fake-plan-id", "fake-handle", null, null, "2012-03-12T00:29:52.275")
val planMembersStream = createStreamingDataFrame(planMember)
val expectedPlanMembers = . . .
//act
val actualPlanMembers = A.transform_stream_test(spark, planMembersStream, "some topic")
process(actualPlanMembers)
//assert
assertDatasetEquals(actualPlanMembers, expectedPlanMembers)
}
Я использую тестовую библиотеку , написанную holdenk, чтобы помочь мне в тестировании;Я выбрал это, потому что я делаю структурированную потоковую передачу, и это, кажется, поддерживает это.
При выполнении этого теста я получаю следующую ошибку:
Queries with streaming sources must be executed with writeStream.start();
Насколько я понял, я запустил поток в своей функции процесса. Если я уберу утверждение, ошибка исчезнет. Я смог подтвердить данные в памяти с spark.sql("select * from output").show()
У меня такие вопросы:
- Правильно ли я настроил свой тест?
- Мне нужен метод процесса, который я написал?
- Что еще мне может не хватать?
Спасибо, что повесили там через стену кода.