Как интегрировать akka streams Websocket с Actor для поддержки противодавления - PullRequest
0 голосов
/ 12 февраля 2019

Я хочу добиться интеграции между Akso Streams Websocket и Actor, который поддерживает противодавление.

Мой протокол веб-сокетов поддерживает:

  • шаблон запроса / ответа, где каждый запрос приводит кровно в одном Ответе
  • Шаблон события, где Событие может быть опубликовано в WS в любое время

Из-за шаблона события я не могу использовать простой Flow, но нужно разделитьв Sink и Source, чтобы я мог отправить Событие без получения Сообщения через WS, по крайней мере, так я его понимаю.

В будущем Запросы, полученные на WS, будутпересылается через grpc какой-либо службе, и когда эта служба отвечает, этот ответ также должен быть возвращен через WS.В идеале я хочу иметь противодавление во всей этой цепочке.

Моя текущая реализация простого протокола Ping / Pong выглядит следующим образом, но

  • Я не уверен, что это лучшееспособ реализовать это.
  • Я не уверен, что делать в случаях ошибок метода offer в Actor
  • Я не уверен, если flatMapConcat TextMessage.textStreamпуть, когда я хочу десериализовать TextMessage
  • Я не уверен, смогу ли я позже поддерживать противодавление, когда я отправляю сообщения от Actor через grpc другому сервису

Я был бы очень признателен, если бы кто-нибудь мог просмотреть мой код и сказать мне, что я могу сделать лучше здесь.

def wsFlow:Flow[Message, Message, NotUsed] = {

  // the actor that is used to handle a single WS
  val ws:ActorRef = system.actorOf(WebSocketActor.props())

  // used to send messages via the WS to the client
  val wsSender:Source[Message, NotUsed] =
    Source
      .queue(bufferSize = 500, overflowStrategy = OverflowStrategy.backpressure)
      .map { response:Response =>
        response match {
          case pr:PingResponse => TextMessage(pr.toJson.compactPrint)
        }
      }
      .mapMaterializedValue { wsQueue =>
        // the wsQueue is used to send messages to the client
        ws ! WsConnect(wsQueue)
        NotUsed // dont expose the wsQueue, change materialized value to NotUsed
      }

  // used to receive messages via the WS from the client
  val wsHandler:Sink[Message, NotUsed] =
    Flow[Message]
      .flatMapConcat {
        case tm:TextMessage => tm.textStream
        case bm:BinaryMessage =>
          // ignore binary messages but drain content to avoid the stream being clogged
          bm.dataStream.runWith(Sink.ignore)
          Source.empty
      }
      .map(text => JsonParser(text).convertTo[Request])
      .to(Sink.actorRef(ws, WsDisconnect))

  Flow.fromSinkAndSource(wsHandler, wsSender)
}
class WebSocketActor(implicit ec:ExecutionContext) extends Actor {
  private var wsQueue:Option[SourceQueueWithComplete[Response]] = None

  override def receive: Receive = {
    case WsConnect(ref) => wsQueue = Some(ref)

    case WsDisconnect =>
      wsQueue.foreach(_.complete())
      wsQueue = None

    case ping:PingRequest =>
      val response = PingResponse(200, ping.clientRequestId, Version(1))
      wsOffer(response)
  }

  private def wsOffer(msg:Response):Unit = wsQueue.foreach(queue => {
    queue.offer(msg).foreach {
      case QueueOfferResult.Enqueued    ⇒ ()
      case QueueOfferResult.Dropped     ⇒ ???
      case QueueOfferResult.Failure(ex) ⇒ ???
      case QueueOfferResult.QueueClosed ⇒ ???
    }
  })
}
...