Должна ли каждая часть моего потока Akka-http выводить сообщения типа Message? - PullRequest
0 голосов
/ 09 июля 2020

Я пытаюсь использовать поток через WebSockets, и я хотел бы проанализировать полученные сообщения и преобразовать их, прежде чем они достигнут приемника. Однако я продолжаю получать ошибки всякий раз, когда использую Source или Sink, которые не принимают Message в качестве входных данных.

Согласно документации :

Поэтому соединение WebSocket моделируется либо как то, что вы подключаете к потоку [Message, Message, Mat], либо как к потоку [Message, Message, Mat], к которому вы подключаете Source [Message, Mat] и Sink [Message, Mat]. to.

Я все еще не уверен, правильно ли я понимаю. Моя путаница заключается в следующем: должны ли источники, потоки и приемники с использованием веб-сокетов Akka-http всегда передавать тип Message? Есть ли способ обойти это? И, что наиболее важно, какая здесь лучшая практика?

Я сформулировал упрощенный фрагмент моего кода (который не предназначен для запуска), а скорее должен помочь в осмыслении моего вопроса.

val outgoing = Source.maybe[Message]

val decoder = Flow[Message] map {x => TextMessage("Hello from decode")}

// Do I need to pass a Message here?
val wrongDecoder = Flow[String] map {x => "Help :( I can't Sink! Maybe because I'm String?"}

val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {case message: TextMessage.Strict => message.text}

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .viaMat(wrongDecoder)(Keep.left) // IDE compiler tells me it expected a Graph but found a Flow?
    .toMat(sink)(Keep.both)
    .run()

1 Ответ

1 голос
/ 09 июля 2020

Да, поток должен принимать и выдавать Message s.

val source: Source[Message, SoMat] = ???

val flow: Flow[Message, Message, FMat] = ???

val sink: Sink[Message, SiMat] = ???

source.via(flow).runWith(sink)  // Ignoring which materializations you'd actually want to keep

Это кажется ограничительным, но обратите внимание на некоторые подписи:

// in Source (with types at least partially expanded)
def map[T](f: Message => T): Source[T, SoMat]

// in Flow (with types at least partially expanded)
def map[T](f: Message => T): Flow[Message, T, FMat]

// in Sink (with types at least partially expanded)
def contramap[T](f: T => Message): Sink[T, SiMap]

т.е. вы можете взять Source[Message] и сопоставьте его с Source любого типа по вашему выбору, если у вас есть функция от Message к этому типу. Точно так же вы можете взять Sink[Message] и сопоставить его с Sink любого типа. А для Flow вы можете сопоставить его с любым типом и снова сопоставить с Message.

Например, вы могли бы

val outgoing = Source.maybe[Message]
val decoder: Flow[Message, String, NotUsed] = Flow[Message].mapConcat { m =>
  m match {
    case TextMessage.Strict(msg) => List(msg)
    case _ => Nil
  }
}

val stringProcessor: Flow[String, String, NotUsed] = Flow[String].map { s => s.replace(':', ';') }

val sink: Sink[String, Future[Done]] = Sink.foreach[Message] {
  case TextMessage.Strict(text) => println(text)
}.contramap[String] { s => TextMessage.Strict(s) }

val websocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))

val (upgradeResponse, closed) =
  outgoing
    .viaMat(websocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .viaMat(stringProcessor)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

Я бы, вероятно, скомбинировал websocketFlow и decoder в один поток от Message до String (или любой другой тип домена, применимый к бизнес-логам c (stringProcessor в данном случае)), используя contramapped Sink, как указано выше.

val stringsFromWebsocket: Flow[Message, String, Future[WebsocketUpgradeResponse]] =
  Http()
    .webSocketClientFlow(WebSocketRequest(uri))
    .viaMat(decoder)(Keep.left)

val (upgradeResponse, closed) =
  outgoing
    .viaMat(stringsFromWebsocket)(Keep.right)
    .viaMat(stringProcessor)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...