Ищите что-то вроде TestFlow, аналогичного TestSink и TestSource - PullRequest
0 голосов
/ 05 мая 2018

Я пишу класс, который принимает Flow (представляющий разновидность сокета) в качестве аргумента конструктора и позволяет отправлять сообщения и асинхронно ждать соответствующих ответов, возвращая Future. Пример:

class SocketAdapter(underlyingSocket: Flow[String, String, _]) {
    def sendMessage(msg: MessageType): Future[ResponseType]
}

Это не обязательно тривиально, потому что в потоке сокета могут быть другие сообщения, которые не имеют значения, поэтому требуется некоторая фильтрация.

Чтобы протестировать класс, мне нужно предоставить что-то вроде «TestFlow», аналогичное TestSink и TestSource. На самом деле я могу создать поток, комбинируя оба. Однако проблема заключается в том, что я получаю реальные датчики только при материализации, и материализация происходит внутри тестируемого класса.

Проблема аналогична той, которую я описал в этом вопросе . Моя проблема была бы решена, если бы я мог сначала материализовать поток, а затем передать его клиенту для подключения к нему. Опять же, я думаю об использовании MergeHub и BroadcastHub и снова вижу проблему в том, что результирующий поток будет вести себя по-другому, потому что он больше не является линейным.

Может быть, я неправильно понял, как Flow предполагается использовать. Для подачи сообщений в поток при вызове sendMessage() мне все равно нужен определенный тип Source. Может быть, Source.actorRef(...) или Source.queue(...), чтобы я мог передать ActorRef или SourceQueue напрямую. Однако я бы предпочел, чтобы этот выбор был до класса SocketAdapter. Конечно, это относится и к Sink.

Такое ощущение, что это довольно распространенный случай при работе с потоками и сокетами. Если невозможно создать «TestFlow» так, как мне нужно, я также рад некоторым советам о том, как улучшить мой дизайн и сделать его лучше тестируемым.

Обновление: Я просмотрел документацию и нашел SourceRef и SinkRef. Похоже, это может решить мою проблему, но я еще не уверен. Разумно ли использовать их в моем случае или есть какие-то недостатки, например другое поведение в тесте по сравнению с производством, где нет таких ссылок?

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Это то, что я придумал, используя SinkRef и SourceRef:

object TestFlow {

  def withProbes[In, Out](implicit actorSystem: ActorSystem,
                                   actorMaterializer: ActorMaterializer)
    :(Flow[In, Out, _], TestSubscriber.Probe[In], TestPublisher.Probe[Out]) = {
    val f = Flow.fromSinkAndSourceMat(TestSink.probe[In], TestSource.probe[Out])
            (Keep.both)

    val ((sinkRefFuture, (inProbe, outProbe)), sourceRefFuture) =
      StreamRefs.sinkRef[In]()
        .viaMat(f)(Keep.both)
        .toMat(StreamRefs.sourceRef[Out]())(Keep.both)
        .run()

    val sinkRef = Await.result(sinkRefFuture, 3.seconds)
    val sourceRef = Await.result(sourceRefFuture, 3.seconds)

    (Flow.fromSinkAndSource(sinkRef, sourceRef), inProbe, outProbe)
  }

}

Это дает мне поток, которым я могу полностью управлять с помощью двух пробников, но я могу передать его клиенту, который подключает источник и приемник позже, так что, похоже, это решит мою проблему.

Полученный Flow должен использоваться только один раз, поэтому он отличается от обычного Flow, который является скорее схемой потока и может быть реализован несколько раз. Однако это ограничение относится к потоку веб-сокетов, который я все равно высмеиваю, как описано здесь .

Единственная проблема, с которой я столкнулся, заключается в том, что некоторые предупреждения регистрируются, когда ActorSystem завершается после теста. Похоже, это связано с косвенностью, введенной SinkRef и SourceRef.

Обновление: Я нашел лучшее решение без SinkRef и SourceRef, используя mapMaterializedValue():

def withProbesFuture[In, Out](implicit actorSystem: ActorSystem,
                                       ec: ExecutionContext)
  : (Flow[In, Out, _],
         Future[(TestSubscriber.Probe[In], TestPublisher.Probe[Out])]) = {
    val (sinkPromise, sourcePromise) =
        (Promise[TestSubscriber.Probe[In]], Promise[TestPublisher.Probe[Out]])

    val flow =
      Flow
        .fromSinkAndSourceMat(TestSink.probe[In], TestSource.probe[Out])(Keep.both)
        .mapMaterializedValue { case (inProbe, outProbe) =>
          sinkPromise.success(inProbe)
          sourcePromise.success(outProbe)
          ()
        }

    val probeTupleFuture = sinkPromise.future
        .flatMap(sink => sourcePromise.future.map(source => (sink, source)))
    (flow, probeTupleFuture)
  }

Когда тестируемый класс материализует поток, Future завершается, и я получаю тестовые зонды.

0 голосов
/ 07 мая 2018

Косвенный ответ

Характер вашего вопроса предполагает недостаток дизайна, с которым вы сталкиваетесь во время тестирования. Приведенный ниже ответ не решает проблему, указанную в вашем вопросе, но демонстрирует, как вообще избежать этой ситуации.

Не смешивайте бизнес-логику с кодом Akka

Предположительно, вам нужно проверить Flow, потому что вы смешали значительное количество логики с материализацией. Предположим, вы используете необработанные сокеты для ввода-вывода. Ваш вопрос предполагает, что ваш поток выглядит так:

val socketFlow : Flow[String, String, _] = {
  val socket = new Socket(...)

  //business logic for IO
}

Вам нужна сложная тестовая среда для вашего потока, потому что сам поток также сложен.

Вместо этого вы должны разделить логику на независимую функцию, которая не имеет зависимостей akka:

type MessageProcessor = MessageType => ResponseType

object BusinessLogic {
  val createMessageProcessor : (Socket) => MessageProcessor = {
    //business logic for IO
  } 
}

Теперь ваш поток может быть очень простым:

val socket : Socket = new Socket(...)

val socketFlow = Flow.map(BusinessLogic.createMessageProcessor(socket))

В результате: ваше модульное тестирование может работать исключительно с createMessageProcessor, вам не нужно тестировать akka Flow, потому что это простая фанера вокруг сложной логики, которая тестируется независимо.

Не использовать потоки для параллелизма вокруг 1 элемента

Другая большая проблема с вашим дизайном заключается в том, что SocketAdapter использует поток для обработки всего 1 сообщения за раз. Это невероятно расточительно и ненужно (вы пытаетесь убить комара с помощью танка).

Учитывая разделенную бизнес-логику, ваш адаптер становится намного проще и независимым от akka:

class SocketAdapter(messageProcessor : MessageProcessor) {
  def sendMessage(msg: MessageType): Future[ResponseType] = Future {
    messageProcessor(msg)
  }
}

Обратите внимание, насколько просто использовать Future в некоторых случаях и Flow в других сценариях в зависимости от необходимости. Это связано с тем, что бизнес-логика не зависит от какой-либо среды параллелизма.

...