Я пытаюсь выполнить последнее задание (называемое «Реактивные последователи») в курсе EFPL - «Программирование реактивных систем» на платформе EDx.
Мне удалось выполнить все функции, кроме outgoingFlow
.
Мне кажется, что я должен каким-то образом создать новый источник из существующего потока, и после некоторого прочтения я все еще не понял, как выполнить поток для генерации элементов для нового источника.
Я пытался использовать mapConcat
, но безуспешно.
Я думаю, что существующий поток таков:
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))
Типы для существующих Flow
s и моей предполагаемой реализации outgoingFlow
можно увидеть здесь:
val eventParserFlow: Flow[ByteString, Event, NotUsed]
val followersFlow: Flow[Event, (Event, Followers), NotUsed]
def outgoingFlow(userId: Int): Source[ByteString, NotUsed] = {
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))
.mapConcat { case (e, _) => e.render }
???
}
Может кто-нибудь указать мне на чтение или пример того, как мне решить подобную проблему в Akka, пожалуйста?