Какова цель составного потока (из Sink и Source)? - PullRequest
0 голосов
/ 13 апреля 2019

Я пытаюсь понять составной поток (из Sink и Source) с веб-сайта , и они представляют собой следующее:

enter image description here

Может ли кто-нибудь привести пример использования составного потока.
И когда мне его использовать?

Ответы [ 2 ]

1 голос
/ 14 апреля 2019

Flow.fromSinkAndSource предоставляет удобный способ собрать flow, составленный из sink в качестве его входа и source в качестве его выхода, которые не подключены, что лучше всего иллюстрируется на примере следующая диаграмма (доступна по ссылке API):

  +----------------------------------------------+
  | Resulting Flow[I, O, NotUsed]                |
  |                                              |
  |  +---------+                  +-----------+  |
  |  |         |                  |           |  |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
  |  |         |                  |           |  |
  |  +---------+                  +-----------+  |
  +----------------------------------------------+

Как показано в ответе @ gabrielgiussi, он часто используется в тех случаях, когда требуется «переключить» вывод существующего source (или flow) на какой-либо другой вывод - для целей тестирования или чего-то еще. Вот упрощенный пример:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )

Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c

Стоит также отметить, что версия "Mat" метода, fromSinkAndSourceMat , имеет несколько интересных вариантов использования. Например, чтобы использовать half-closed WebSockets открытым, используйте Source.maybe[T], чтобы сохранить Promise[Option[T]] в качестве материализованного значения, которое будет выполнено, когда кто-то захочет закрыть соединение. Ниже приведен пример кода из соответствующего раздела документа поддержки клиента *1020* Akka-http:

: *1021*
// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
  Flow.fromSinkAndSourceMat(
    Sink.foreach[Message](println),
    Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) =
  Http().singleWebSocketRequest(
    WebSocketRequest("ws://example.com:8080/some/path"),
    flow)

// at some later time we want to disconnect
promise.success(None)
1 голос
/ 14 апреля 2019

Может быть, в каком-то сценарии вам просто нужно предоставить поток, а в некоторых случаях вам нужен поток NoOp. Тогда вы могли бы сделать

Flow.fromSinkAndSource(Sink.ignore,Source.empty)

Или игнорировать каждый элемент из источника и использовать другой

Flow.fromSinkAndSource(Sink.ignore,Source.tick(1.second,1.second,"something"))
...