Я хочу добиться интеграции между Akso Streams Websocket и Actor, который поддерживает противодавление.
Мой протокол веб-сокетов поддерживает:
- шаблон запроса / ответа, где каждый запрос приводит кровно в одном Ответе
- Шаблон события, где Событие может быть опубликовано в WS в любое время
Из-за шаблона события я не могу использовать простой Flow
, но нужно разделитьв Sink
и Source
, чтобы я мог отправить Событие без получения Сообщения через WS, по крайней мере, так я его понимаю.
В будущем Запросы, полученные на WS, будутпересылается через grpc какой-либо службе, и когда эта служба отвечает, этот ответ также должен быть возвращен через WS.В идеале я хочу иметь противодавление во всей этой цепочке.
Моя текущая реализация простого протокола Ping / Pong выглядит следующим образом, но
- Я не уверен, что это лучшееспособ реализовать это.
- Я не уверен, что делать в случаях ошибок метода
offer
в Actor - Я не уверен, если
flatMapConcat
TextMessage.textStream
путь, когда я хочу десериализовать TextMessage
- Я не уверен, смогу ли я позже поддерживать противодавление, когда я отправляю сообщения от Actor через grpc другому сервису
Я был бы очень признателен, если бы кто-нибудь мог просмотреть мой код и сказать мне, что я могу сделать лучше здесь.
def wsFlow:Flow[Message, Message, NotUsed] = {
// the actor that is used to handle a single WS
val ws:ActorRef = system.actorOf(WebSocketActor.props())
// used to send messages via the WS to the client
val wsSender:Source[Message, NotUsed] =
Source
.queue(bufferSize = 500, overflowStrategy = OverflowStrategy.backpressure)
.map { response:Response =>
response match {
case pr:PingResponse => TextMessage(pr.toJson.compactPrint)
}
}
.mapMaterializedValue { wsQueue =>
// the wsQueue is used to send messages to the client
ws ! WsConnect(wsQueue)
NotUsed // dont expose the wsQueue, change materialized value to NotUsed
}
// used to receive messages via the WS from the client
val wsHandler:Sink[Message, NotUsed] =
Flow[Message]
.flatMapConcat {
case tm:TextMessage => tm.textStream
case bm:BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Source.empty
}
.map(text => JsonParser(text).convertTo[Request])
.to(Sink.actorRef(ws, WsDisconnect))
Flow.fromSinkAndSource(wsHandler, wsSender)
}
class WebSocketActor(implicit ec:ExecutionContext) extends Actor {
private var wsQueue:Option[SourceQueueWithComplete[Response]] = None
override def receive: Receive = {
case WsConnect(ref) => wsQueue = Some(ref)
case WsDisconnect =>
wsQueue.foreach(_.complete())
wsQueue = None
case ping:PingRequest =>
val response = PingResponse(200, ping.clientRequestId, Version(1))
wsOffer(response)
}
private def wsOffer(msg:Response):Unit = wsQueue.foreach(queue => {
queue.offer(msg).foreach {
case QueueOfferResult.Enqueued ⇒ ()
case QueueOfferResult.Dropped ⇒ ???
case QueueOfferResult.Failure(ex) ⇒ ???
case QueueOfferResult.QueueClosed ⇒ ???
}
})
}