Я немного растерялся, используя библиотеки akka-http для создания сервера.Мне нужно установить следующее соединение:
- Существует один сервер и n клиентов (n <5) </li>
- Иногда клиенты отправляют команду на сервер, сервер оценивает/ делегирует команду и отвечает клиенту
- Постоянные широковещательные сообщения от сервера всем клиентам
Учитывая, что:
- мой сервер долженуправлять несколькими «сессиями», подключенными через веб-сокет
Вот моя конечная точка веб-сокета:
path("socket") {
handleWebSocketMessages(listen())
}
А вот и метод listen()
:
// stores offers to broadcast to all clients
private var offers: List[TextMessage => Unit] = List()
def listen(): Flow[Message, Message, NotUsed] = {
val inbound: Sink[Message, Any] = Sink.foreach(m => /* handle the message */) // (*)
val outbound: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](16, OverflowStrategy.fail)
Flow.fromSinkAndSourceMat(inbound, outbound)((_, outboundMat) => {
offers ::= outboundMat.offer
NotUsed
})
}
def sendText(text: String): Unit = {
for (connection <- offers) connection(TextMessage.Strict(text))
}
При таком подходе я могу зарегистрировать несколько клиентов и ответить на них, используя метод sendText(text: String)
.Но есть одна большая проблема: как мне ответить только конкретному клиенту после того, как я оценил его команду.(см. (*)
)
[Еще одна вещь, которая меня беспокоит, это то, что offers
- это переменная, которая кажется неправильной при программировании исключительно на FP, но я могу принять это, если все остальное работает]
Редактировать:
Чтобы уточнить, я в основном должен иметь возможность реализовать метод, похожий на этот:
def onMessageReceived(m: Message, answer: TextMessage => Unit): Unit = {
val response: TextMessage = handleMessage(m)
answer(response)
}
Но я не могу понятьгде вызвать этот метод в моем потоке websocket.