Lagom Framework Тема Соединение Websocket Timeout закрыто с ошибкой - PullRequest
0 голосов
/ 18 октября 2018

Я создал тему lagom в моем сервисе. def addingsTopic(): Topic[ProcessDefinitionAdded] дескриптор:

override def descriptor: Descriptor = {
    import Service._
    named(ProductionService.TOPIC_NAME).withCalls(
      pathCall(pathProcessDef(":id"), getProcessDefinition _)
    ).withTopics(
      topic("processdef", addingsTopic _ )
        .addProperty(
          KafkaProperties.partitionKeyStrategy,
          PartitionKeyStrategy[ProcessDefinitionAdded](_.entityId.toString)
        ),
      topic("productionserviceorder", ordersTopic _)
        .addProperty(
          KafkaProperties.partitionKeyStrategy,
          PartitionKeyStrategy[OrderAddedTopic](_.entityId.toString)
        )
    ).withAutoAcl(true)
  }

Пока здесь все ясно.Теперь у меня есть игровая веб-страница.У меня в контроллере есть Websocket.

val k = productionService.addingsTopic().subscribe.atMostOnceSource.mapAsync(1){e ⇒
    productionService.getProcessDefinition(e.entityId).invoke().map{e ⇒
      Json.toJson(e)(ProcessDefinitionResponse.format)
    }
  }.toMat(BroadcastHub.sink(256) )(Keep.right).run().recover{
    case e ⇒ println(e)
      Json.toJson("error "+e)
  }
  val f = Flow.fromSinkAndSource(Sink.ignore,k.concat(Source.maybe))

  def fetchProcessDefinition() = WebSocket.accept[String,JsValue] { req ⇒
    f
  }

Мой веб-сайт JavaScript использует это для подключения:

<script>
        try{
            console.log("START");
            var socket = new WebSocket("ws://localhost:9000/processdef/stream");

            socket.onopen = function(s){
                console.log("OPEN");
                console.log(s);
            };

            socket.onerror = function(error){
                console.log("ERROR: ");
                console.log(error);
            };

            socket.onmessage = function(msg){
                console.log("Message:");
                console.log(JSON.stringify(msg.data));
            };

            socket.onclose = function(closeevent){
                console.log("CLOSE:");
                console.log(closeevent);
            };

            socket.send("")
        }catch (e) {
            console.error(e);
        }
    </script>

Я загружаю страницу на localhost после sbt runAll.Соединение веб-сокетов занимает некоторое время (иногда несколько минут, а иногда и несколько секунд).Через ~ 90 секунд консоль напечатает, что соединение закрыто.В IDE я получаю эту ошибку:

2018-10-18T10:54:16.655Z [error] akka.actor.ActorSystemImpl [sourceThread=application-akka.actor.default-dispatcher-213, akkaSource=akka.actor.ActorSystemImpl(application), sourceActorSystem=application, akkaTimestamp=10:54:16.654UTC] - Websocket handler failed with The connection closed with error: The connection was reset by peer
akka.stream.StreamTcpException: The connection closed with error: The connection was reset by peer
2018-10-18T10:54:16.674Z [error] akka.actor.ActorSystemImpl [sourceThread=application-akka.actor.default-dispatcher-231, akkaSource=akka.actor.ActorSystemImpl(application), sourceActorSystem=application, akkaTimestamp=10:54:16.674UTC] - Websocket handler failed with The connection closed with error: The connection was reset by peer
akka.stream.StreamTcpException: The connection closed with error: The connection was reset by peer

Это очень напряженно.У меня есть ошибки в моем коде?

Спасибо за любую помощь.

...