Я получаю Flow<A, B>
(причудливый поток / графическая штука, см. https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html) из некоторого внешнего кода вне моего контроля. Мне нужно обернуть этот поток и выполнить некоторую обработку для каждого элемента ввода икаждый элемент вывода. Я могу легко добиться этого, поместив BidiFlow
поверх него следующим образом:
Flow<I, O, Unused> flow = ...; // external source
BidiFlow<I, I, O, O, Unused> bidi = BidiFlow.fromFunctions(i -> preprocess(i), o -> postprocess(o)); // do something on every input and every output
Flow<I, O, Unused> newFlow = bidi.join(flow);
Итак, вот поворот: правильно обработать выводэлемент o
, мне нужны входные данные, которые сгенерировали этот выходной элемент.Так как у меня нет контроля над базовым потоком, я не могу реорганизовать его, чтобы он возвращал, например, кортеж входа и выхода.Асинхронный и параллельный характер Akka, я не могу делать какие-либо трюки, такие как хранение входных данных в локальном потоке или статическое поле или что-то подобное.
Поэтому мой вопрос: есть ли какая-то магия Akka Streams, которую я могу применить, чтобы как-то получитьэлемент ввода, который сгенерировал вывод?