Как прозрачно связать элемент ввода с элементом вывода - PullRequest
0 голосов
/ 29 ноября 2018

Я получаю Flow<A, B> (причудливый поток / графическая штука, см. https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html) из некоторого внешнего кода вне моего контроля. Мне нужно обернуть этот поток и выполнить некоторую обработку для каждого элемента ввода икаждый элемент вывода. Я могу легко добиться этого, поместив BidiFlow поверх него следующим образом:

Flow<I, O, Unused> flow = ...; // external source
BidiFlow<I, I, O, O, Unused> bidi = BidiFlow.fromFunctions(i -> preprocess(i), o -> postprocess(o)); // do something on every input and every output
Flow<I, O, Unused> newFlow = bidi.join(flow);

Итак, вот поворот: правильно обработать выводэлемент o, мне нужны входные данные, которые сгенерировали этот выходной элемент.Так как у меня нет контроля над базовым потоком, я не могу реорганизовать его, чтобы он возвращал, например, кортеж входа и выхода.Асинхронный и параллельный характер Akka, я не могу делать какие-либо трюки, такие как хранение входных данных в локальном потоке или статическое поле или что-то подобное.

Поэтому мой вопрос: есть ли какая-то магия Akka Streams, которую я могу применить, чтобы как-то получитьэлемент ввода, который сгенерировал вывод?

Ответы [ 2 ]

0 голосов
/ 06 декабря 2018

Это решение использования этапов GraphDsl, Broadcast и Zip.

  val externalFlow: Flow[Int, String, NotUsed] = Flow[Int].map(i => i.toString + "-external")

  def zipInAndOut[I, O](flow: Flow[I, O, NotUsed]): Flow[I, (I, O), NotUsed] = {
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val broadcast = b.add(Broadcast[I](2))
      val zip = b.add(Zip[I, O])
      val theFlow = b.add(flow)
      broadcast.out(0) ~> zip.in0
      broadcast.out(1) ~> theFlow ~> zip.in1
      new FlowShape(broadcast.in, zip.out)
    })
  }
  Source
    .fromIterator(() => (1 until 10).iterator)
    .via(zipInAndOut(externalFlow))
    .runWith(Sink.foreach(println))

отпечатки

(1,1-external)
(2,2-external)
(3,3-external)
(4,4-external)
(5,5-external)
(6,6-external)
(7,7-external)
(8,8-external)
(9,9-external)
0 голосов
/ 29 ноября 2018

Вы можете использовать Graph Api.Вы можете транслировать свой ввод двум потокам: один для создания вашего процесса и другой для обхода вашего идентификатора.Задача las должна быть молнией этих потоков.Посмотрите на Akka Streams / HTTP: Получить оригинальный запрос от ответа .Может быть, это может помочь.

...