Flow.fromSinkAndSource предоставляет удобный способ собрать flow
, составленный из sink
в качестве его входа и source
в качестве его выхода, которые не подключены, что лучше всего иллюстрируется на примере следующая диаграмма (доступна по ссылке API):
+----------------------------------------------+
| Resulting Flow[I, O, NotUsed] |
| |
| +---------+ +-----------+ |
| | | | | |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
| | | | | |
| +---------+ +-----------+ |
+----------------------------------------------+
Как показано в ответе @ gabrielgiussi, он часто используется в тех случаях, когда требуется «переключить» вывод существующего source
(или flow
) на какой-либо другой вывод - для целей тестирования или чего-то еще. Вот упрощенный пример:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )
Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c
Стоит также отметить, что версия "Mat" метода, fromSinkAndSourceMat , имеет несколько интересных вариантов использования. Например, чтобы использовать half-closed
WebSockets открытым, используйте Source.maybe[T]
, чтобы сохранить Promise[Option[T]]
в качестве материализованного значения, которое будет выполнено, когда кто-то захочет закрыть соединение. Ниже приведен пример кода из соответствующего раздела документа поддержки клиента *1020* Akka-http:
: *1021*
// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](println),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
// at some later time we want to disconnect
promise.success(None)