Akka-http WebSocket, как непрерывно отправлять сообщения из темы MQTT - PullRequest
0 голосов
/ 15 апреля 2019

Я хочу создать сервер WebSocket, ожидая MQTT-тему от клиента и непрерывно отправлять клиенту сообщения по темам. Вот что я сделал в Scala

def echoService(): Flow[Message, Message, _] = Flow[Message].map{
      case TextMessage.Strict(txt) =>
        val temp = password.split("/")
        if (temp.nonEmpty && temp.length < 4) {
          val res = new ListBuffer[String]
          val persistence = new MemoryPersistence
          val client = new MqttClient(url, MqttClient.generateClientId(), persistence)
          val MQtt_Option: MqttConnectOptions = new MqttConnectOptions
          MQtt_Option.setCleanSession(true)
          MQtt_Option.setUserName(userName)
          MQtt_Option.setPassword(password.toCharArray)
          client.connect(MQtt_Option)
          client.subscribe(txt)
          client.setCallback(
            new MqttCallback() {
            override def connectionLost(cause: Throwable): Unit = {
            }

            @throws[Exception]
            override def messageArrived(topic: String, message: MqttMessage): Unit = {
              TextMessage(message.getPayload.toString)
            }

            override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
            }
          })
        }
        TextMessage("wrong request!")
    }
val websocketRoute = get {
          handleWebSocketMessages(echoService())
        }

    Http().bindAndHandle(websocketRoute, "127.0.0.1", 8080)
    Await.result(system.whenTerminated, Duration.Inf)

В приведенном выше коде логика заключается в создании соединения MQTT для каждого запроса клиента WebSocket. Он будет слушать тему и непрерывно отправлять MQTT-сообщения клиенту WebSocket. Но при запросе к серверу я просто получаю неправильный запрос! сообщение! Как я должен изменить его, чтобы отправлять сообщения MQTT непрерывно

...