Это то, что я придумал, используя SinkRef
и SourceRef
:
object TestFlow {
def withProbes[In, Out](implicit actorSystem: ActorSystem,
actorMaterializer: ActorMaterializer)
:(Flow[In, Out, _], TestSubscriber.Probe[In], TestPublisher.Probe[Out]) = {
val f = Flow.fromSinkAndSourceMat(TestSink.probe[In], TestSource.probe[Out])
(Keep.both)
val ((sinkRefFuture, (inProbe, outProbe)), sourceRefFuture) =
StreamRefs.sinkRef[In]()
.viaMat(f)(Keep.both)
.toMat(StreamRefs.sourceRef[Out]())(Keep.both)
.run()
val sinkRef = Await.result(sinkRefFuture, 3.seconds)
val sourceRef = Await.result(sourceRefFuture, 3.seconds)
(Flow.fromSinkAndSource(sinkRef, sourceRef), inProbe, outProbe)
}
}
Это дает мне поток, которым я могу полностью управлять с помощью двух пробников, но я могу передать его клиенту, который подключает источник и приемник позже, так что, похоже, это решит мою проблему.
Полученный Flow
должен использоваться только один раз, поэтому он отличается от обычного Flow
, который является скорее схемой потока и может быть реализован несколько раз. Однако это ограничение относится к потоку веб-сокетов, который я все равно высмеиваю, как описано здесь .
Единственная проблема, с которой я столкнулся, заключается в том, что некоторые предупреждения регистрируются, когда ActorSystem
завершается после теста. Похоже, это связано с косвенностью, введенной SinkRef
и SourceRef
.
Обновление: Я нашел лучшее решение без SinkRef
и SourceRef
, используя mapMaterializedValue()
:
def withProbesFuture[In, Out](implicit actorSystem: ActorSystem,
ec: ExecutionContext)
: (Flow[In, Out, _],
Future[(TestSubscriber.Probe[In], TestPublisher.Probe[Out])]) = {
val (sinkPromise, sourcePromise) =
(Promise[TestSubscriber.Probe[In]], Promise[TestPublisher.Probe[Out]])
val flow =
Flow
.fromSinkAndSourceMat(TestSink.probe[In], TestSource.probe[Out])(Keep.both)
.mapMaterializedValue { case (inProbe, outProbe) =>
sinkPromise.success(inProbe)
sourcePromise.success(outProbe)
()
}
val probeTupleFuture = sinkPromise.future
.flatMap(sink => sourcePromise.future.map(source => (sink, source)))
(flow, probeTupleFuture)
}
Когда тестируемый класс материализует поток, Future
завершается, и я получаю тестовые зонды.