Как создать Source из Flow в Акка-стрим?(Программирование активности реактивных систем) - PullRequest
0 голосов
/ 02 июля 2019

Я пытаюсь выполнить последнее задание (называемое «Реактивные последователи») в курсе EFPL - «Программирование реактивных систем» на платформе EDx.

Мне удалось выполнить все функции, кроме outgoingFlow.

Мне кажется, что я должен каким-то образом создать новый источник из существующего потока, и после некоторого прочтения я все еще не понял, как выполнить поток для генерации элементов для нового источника.

Я пытался использовать mapConcat, но безуспешно.

Я думаю, что существующий поток таков:

eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))

Типы для существующих Flow s и моей предполагаемой реализации outgoingFlow можно увидеть здесь:

val eventParserFlow: Flow[ByteString, Event, NotUsed]
val followersFlow: Flow[Event, (Event, Followers), NotUsed]

def outgoingFlow(userId: Int): Source[ByteString, NotUsed] = {
  eventParserFlow
    .via(followersFlow)
    .filter(p => isNotified(userId)(p))
    .mapConcat { case (e, _) => e.render }
  ???
}

Может кто-нибудь указать мне на чтение или пример того, как мне решить подобную проблему в Akka, пожалуйста?

1 Ответ

0 голосов
/ 03 июля 2019

просто примечание - ТАК не лучший ресурс для подобных вопросов. то, что вы должны использовать, это обсуждение раздел в соответствующем курсе edx


относительно вашего вопроса - я не дам вам четкого ответа, только несколько подсказок.

в akka-streams вы не можете просто создать Source из Flow. Flow отвечает за преобразование, а Source создает новые события. в своем назначении вы просто забыли использовать одно из доступных значений.

  1. внимательно прочитайте комментарии в class Server (не object).
  2. присмотритесь к val (inboundSink, broadcastOut) = ... и попытайтесь выяснить, для чего предназначен каждый из vals и как они связаны друг с другом и с самим приложением. будет полезно понять, какие у них типы

этих подсказок должно быть достаточно, чтобы понять, как реализовать outgoingFlow, то есть Source[ByteString, NotUsed]

...