Я хочу создать сервер WebSocket, ожидая MQTT-тему от клиента и непрерывно отправлять клиенту сообщения по темам. Вот что я сделал в Scala
def echoService(): Flow[Message, Message, _] = Flow[Message].map{
case TextMessage.Strict(txt) =>
val temp = password.split("/")
if (temp.nonEmpty && temp.length < 4) {
val res = new ListBuffer[String]
val persistence = new MemoryPersistence
val client = new MqttClient(url, MqttClient.generateClientId(), persistence)
val MQtt_Option: MqttConnectOptions = new MqttConnectOptions
MQtt_Option.setCleanSession(true)
MQtt_Option.setUserName(userName)
MQtt_Option.setPassword(password.toCharArray)
client.connect(MQtt_Option)
client.subscribe(txt)
client.setCallback(
new MqttCallback() {
override def connectionLost(cause: Throwable): Unit = {
}
@throws[Exception]
override def messageArrived(topic: String, message: MqttMessage): Unit = {
TextMessage(message.getPayload.toString)
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
}
})
}
TextMessage("wrong request!")
}
val websocketRoute = get {
handleWebSocketMessages(echoService())
}
Http().bindAndHandle(websocketRoute, "127.0.0.1", 8080)
Await.result(system.whenTerminated, Duration.Inf)
В приведенном выше коде логика заключается в создании соединения MQTT для каждого запроса клиента WebSocket. Он будет слушать тему и непрерывно отправлять MQTT-сообщения клиенту WebSocket.
Но при запросе к серверу я просто получаю неправильный запрос! сообщение! Как я должен изменить его, чтобы отправлять сообщения MQTT непрерывно