Возможно ли в Akka создать функцию Flow from Source => Source, как в RxJava / Reactor compose? - PullRequest
1 голос
/ 07 февраля 2020

Я до сих пор не нашел простой способ сделать «фильтрующий» поток в AkkaStreams. Выполнение потока «сопоставления» для меня понятно с помощью функции fromFunction, но фильтрация - нет. В RxJava / Reactor в Flowable / Observable есть оператор compose, который переводит функцию из Flowable в другую Flowable, поэтому преобразование можно описать как цепочку операторов, и, конечно, мне нужен оператор фильтра в Source. для потока фильтрации, но мне неясно, как определить поток фильтрации, хотя, конечно, мне легко, как фильтровать источник, конечно. Пожалуйста, сообщите

1 Ответ

2 голосов
/ 07 февраля 2020
// Filter elements which are even (use the modulo operator: `%`)
def filterEvenValues: Flow[Int, Int, NotUsed] =
  Flow[Int].filter(number => number % 2 == 0)
...