Тестирование заданий Apache Spark - PullRequest
0 голосов
/ 21 октября 2019

Я пытаюсь написать несколько тестов для написанного мною задания Spark и сталкиваюсь с ошибкой, когда пытаюсь утверждать.

Я сделал следующее (извините за стену кода; я не знаком со Spark и тем, что есть и не нужно тестировать, и это моя первая попытка =]).

  1. Вычеркни немного логики из моей работы в Spark.
def transform_stream_test(spark: SparkSession, stream: Dataset[Row], topic: String) = {
    import spark.implicits._
    stream
      .select(col("key").cast(StringType).as("memberId"), unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss").cast(TimestampType).as("publishDate"), col("value").as[Array[Byte]].as("value"))
      .map { entity =>
        val schemaRegUrl = //some url
        val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegUrl, 100)
        val avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient).asInstanceOf[Deserializer[GenericRecord]]

        PlanMember(
          memberId = entity.getAs[String]("memberId"),
          publishDate = entity.getAs[java.sql.Timestamp]("publishDate"),
          record = Option(entity.getAs[Array[Byte]]("value")).map(p => avroDeserializer.deserialize(topic, p)))
      }
      .where(!col("member_id").startsWith("User") and !col("member_id").startsWith("user"))  
  }
Макет MemoryStream типа PlanMember для перехода в этот метод.
def createStreamingDataFrame(planMember: PlanMember) : DataFrame = {
    val input = new MemoryStream[PlanMember](42, spark.sqlContext)
    input.addData(Seq(planMember))

    input
      .toDS()
      .map { m =>
      val planMemberTopic = "sometopic"
      . . . // some logic to register avro

      val serializedPlanMember =  avroSerializer.serialize(planMemberTopic, avroRecord)

      val timestamp = dateFormat.format(m.event_date)

      (m.member_id, timestamp, serializedPlanMember)

    }.toDF("key", "timestamp", "value")

  }
Написана функция процесса
def process(stream: Dataset[PlanMember]) = {
    stream
      .writeStream
      .format("memory")
      .queryName("Output")
      .outputMode(OutputMode.Append)
      .start()
      .processAllAvailable()
  }
Написан тест
test("testing main stream datasets") {

    //arrange
    val planMember = createPlanMember("user", "2018-07-09T09:45:50", "fake-plan-id", "fake-handle", null, null, "2012-03-12T00:29:52.275")
    val planMembersStream = createStreamingDataFrame(planMember)
    val expectedPlanMembers = . . . 

    //act
    val actualPlanMembers = A.transform_stream_test(spark, planMembersStream, "some topic")
    process(actualPlanMembers)

    //assert
    assertDatasetEquals(actualPlanMembers, expectedPlanMembers) 

  }

Я использую тестовую библиотеку , написанную holdenk, чтобы помочь мне в тестировании;Я выбрал это, потому что я делаю структурированную потоковую передачу, и это, кажется, поддерживает это.

При выполнении этого теста я получаю следующую ошибку:

Queries with streaming sources must be executed with writeStream.start();

Насколько я понял, я запустил поток в своей функции процесса. Если я уберу утверждение, ошибка исчезнет. Я смог подтвердить данные в памяти с spark.sql("select * from output").show()

У меня такие вопросы:

  1. Правильно ли я настроил свой тест?
  2. Мне нужен метод процесса, который я написал?
  3. Что еще мне может не хватать?

Спасибо, что повесили там через стену кода.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...