Тестовый поток Akka Stream, когда Supervision.Resume реализован - PullRequest
0 голосов
/ 30 апреля 2018

Недавно я реализовал поток akka-stream, который анализирует некоторые сообщения json, проверяет наличие заданного ключа (destination_region) и передает на следующий этап класс case, содержащий исходное сообщение и строку destination_region.

Я реализовал пользовательский решатель, чтобы в случае возникновения ошибки синтаксического анализа или ошибки ключа он вызывал Supervision.Resume после регистрации исключения.

Минималистичная реализация будет выглядеть так:

package com.example.stages

import com.example.helpers.EitherHelpers._
/*
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

object EitherHelpers {
  implicit class ErrorEither[L <: Throwable, R](val either: Either[L, R]) extends AnyVal {
    def asFuture: Future[R] = either.fold(Future.failed, Future.successful)
    def asTry: Try[R]       = either.fold(Failure.apply, Success.apply)
  }
}
*/

import scala.concurrent.ExecutionContext

import akka.NotUsed
import akka.stream.scaladsl.Flow
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision

import software.amazon.awssdk.services.sqs.model.Message

import io.circe.parser.parse
import io.circe.{DecodingFailure, ParsingFailure}


object MessageContentValidationFlow {
  def apply()(
      implicit executionContext: ExecutionContext): Flow[Message, MessageWithRegion, NotUsed] = {
    val customDecider: Supervision.Decider = {
      case e @ (_: DecodingFailure | _: ParsingFailure) => {
        println(e)
        Supervision.Resume
      }
      case _ => Supervision.Stop
    }
    Flow[Message]
      .mapAsync[MessageWithRegion](2) { message =>
        println(s"Received message: $message")
        val messageWithRegion = for {
          parsed <- parse(message.body()).asFuture
          region <- parsed.hcursor.downField("destination_region").as[String].asFuture
        } yield { MessageWithRegion(message, region) }
        messageWithRegion
      }
      .withAttributes(supervisionStrategy(customDecider))
  }
}

case class MessageWithRegion(message: Message, region: String)

Мне удалось протестировать случай, когда сообщение является действительным, однако я не имею ни малейшего представления о том, как проверить поток в случае ParsingFailure или DecodingFailure. Я попробовал почти все методы, доступные для sub в приведенной ниже реализации:

package com.example.stages

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import io.circe.generic.JsonCodec, io.circe.syntax._
import io.circe.generic.auto._

import software.amazon.awssdk.services.sqs.model.Message
import org.scalatest.FlatSpec


@JsonCodec case class MessageBody(destination_region: String)

class MessageContentValidationFlowSpecs extends FlatSpec {
    implicit val system       = ActorSystem("MessageContentValidationFlow")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    val (pub, sub) = TestSource.probe[Message]
        .via(MessageContentValidationFlow())
        .toMat(TestSink.probe[MessageWithRegion])(Keep.both)
        .run()

    "MessageContentValidationFlow" should "process valid messages" in {
        val validRegion = "eu-west-1"
        val msgBody = MessageBody(validRegion).asJson.toString()
        val validMessage = Message.builder().body(msgBody).build()

        sub.request(n = 1)
        pub.sendNext(validMessage)

        val expectedMessageWithRegion = MessageWithRegion(
          message = validMessage,
          region = validRegion
        )
        assert(sub.requestNext() == expectedMessageWithRegion)
    }

    ignore should "trigger Supervision.Resume with empty messages" in {
      val emptyMessage = Message.builder().body("").build()
      assert(emptyMessage.body() == "")

      sub.request(n = 1)
      pub.sendNext(emptyMessage)
      sub.expectComplete()
    }
}

Кто-нибудь знает, как проверить, что Supervision.Resume был запущен и какое исключение было перехвачено пользовательским решателем?

1 Ответ

0 голосов
/ 30 апреля 2018

Поскольку Supervision.Resume отбрасывает ошибочные элементы и продолжает обрабатывать поток, один из способов проверить эту стратегию надзора - запустить поток, содержащий сочетание «хороших» и «плохих» элементов, и убедиться, что материализованное значение состоит только из «хорошие» элементы. Например:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import org.scalatest._

import scala.concurrent._
import scala.concurrent.duration._

class MyTest extends FlatSpec with Matchers {

  implicit val system = ActorSystem("MyTest")
  implicit val materializer = ActorMaterializer()

  val resumingFlow = Flow[Int].map {
    case 2 => throw new RuntimeException("bad number")
    case i => i
  }.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))

  "resumingFlow" should "drop the number 2" in {
    val result: collection.immutable.Seq[Int] =
      Await.result(Source((1 to 5).toSeq).via(resumingFlow).runWith(Sink.seq), 5.seconds)

    result should be (List(1, 3, 4, 5))
  }
}

В вашем случае создайте поток, содержащий действительные Message объекты и хотя бы один недействительный Message объект.

...