Ответить конкретному клиенту с помощью akka-http, а также поддерживать трансляцию - PullRequest
0 голосов
/ 01 февраля 2019

Я немного растерялся, используя библиотеки akka-http для создания сервера.Мне нужно установить следующее соединение:

  • Существует один сервер и n клиентов (n <5) </li>
  • Иногда клиенты отправляют команду на сервер, сервер оценивает/ делегирует команду и отвечает клиенту
  • Постоянные широковещательные сообщения от сервера всем клиентам

Учитывая, что:

  • мой сервер долженуправлять несколькими «сессиями», подключенными через веб-сокет

Вот моя конечная точка веб-сокета:

path("socket") {
  handleWebSocketMessages(listen())
}

А вот и метод listen():

// stores offers to broadcast to all clients
private var offers: List[TextMessage => Unit] = List()

def listen(): Flow[Message, Message, NotUsed] = {
  val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
  val outbound: Source[Message, SourceQueueWithComplete[Message]] =
    Source.queue[Message](16, OverflowStrategy.fail)

  Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
    offers ::= outboundMat.offer
    NotUsed
  })
}

def sendText(text: String): Unit = {
  for (connection <- offers) connection(TextMessage.Strict(text))
}

При таком подходе я могу зарегистрировать несколько клиентов и ответить на них, используя метод sendText(text: String).Но есть одна большая проблема: как мне ответить только конкретному клиенту после того, как я оценил его команду.(см. (*))

[Еще одна вещь, которая меня беспокоит, это то, что offers - это переменная, которая кажется неправильной при программировании исключительно на FP, но я могу принять это, если все остальное работает]

Редактировать:

Чтобы уточнить, я в основном должен иметь возможность реализовать метод, похожий на этот:

def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
  val response: TextMessage = handleMessage(m)
  answer(response)
}

Но я не могу понятьгде вызвать этот метод в моем потоке websocket.

1 Ответ

0 голосов
/ 04 февраля 2019

Я не совсем уверен, так ли это, но похоже, что это работает:

var actors: List[ActorRef] = Nil

private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
    val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()

    actors = actor :: actors

    val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
      Flow[ws.Message]
        .merge(source)
        .map {
          case TextMessage.Strict(tm) => handleMessage(actor, tm)
          case _ => TextMessage.Strict("Ignored message!")
        }
    wsHandler
  }

  def broadcast(msg: String): Unit = {
    actors.foreach(_ ! TextMessage.Strict(msg))
  }
...