Я создал тему lagom в моем сервисе.
def addingsTopic(): Topic[ProcessDefinitionAdded]
дескриптор:
override def descriptor: Descriptor = {
import Service._
named(ProductionService.TOPIC_NAME).withCalls(
pathCall(pathProcessDef(":id"), getProcessDefinition _)
).withTopics(
topic("processdef", addingsTopic _ )
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[ProcessDefinitionAdded](_.entityId.toString)
),
topic("productionserviceorder", ordersTopic _)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[OrderAddedTopic](_.entityId.toString)
)
).withAutoAcl(true)
}
Пока здесь все ясно.Теперь у меня есть игровая веб-страница.У меня в контроллере есть Websocket.
val k = productionService.addingsTopic().subscribe.atMostOnceSource.mapAsync(1){e ⇒
productionService.getProcessDefinition(e.entityId).invoke().map{e ⇒
Json.toJson(e)(ProcessDefinitionResponse.format)
}
}.toMat(BroadcastHub.sink(256) )(Keep.right).run().recover{
case e ⇒ println(e)
Json.toJson("error "+e)
}
val f = Flow.fromSinkAndSource(Sink.ignore,k.concat(Source.maybe))
def fetchProcessDefinition() = WebSocket.accept[String,JsValue] { req ⇒
f
}
Мой веб-сайт JavaScript использует это для подключения:
<script>
try{
console.log("START");
var socket = new WebSocket("ws://localhost:9000/processdef/stream");
socket.onopen = function(s){
console.log("OPEN");
console.log(s);
};
socket.onerror = function(error){
console.log("ERROR: ");
console.log(error);
};
socket.onmessage = function(msg){
console.log("Message:");
console.log(JSON.stringify(msg.data));
};
socket.onclose = function(closeevent){
console.log("CLOSE:");
console.log(closeevent);
};
socket.send("")
}catch (e) {
console.error(e);
}
</script>
Я загружаю страницу на localhost после sbt runAll.Соединение веб-сокетов занимает некоторое время (иногда несколько минут, а иногда и несколько секунд).Через ~ 90 секунд консоль напечатает, что соединение закрыто.В IDE я получаю эту ошибку:
2018-10-18T10:54:16.655Z [error] akka.actor.ActorSystemImpl [sourceThread=application-akka.actor.default-dispatcher-213, akkaSource=akka.actor.ActorSystemImpl(application), sourceActorSystem=application, akkaTimestamp=10:54:16.654UTC] - Websocket handler failed with The connection closed with error: The connection was reset by peer
akka.stream.StreamTcpException: The connection closed with error: The connection was reset by peer
2018-10-18T10:54:16.674Z [error] akka.actor.ActorSystemImpl [sourceThread=application-akka.actor.default-dispatcher-231, akkaSource=akka.actor.ActorSystemImpl(application), sourceActorSystem=application, akkaTimestamp=10:54:16.674UTC] - Websocket handler failed with The connection closed with error: The connection was reset by peer
akka.stream.StreamTcpException: The connection closed with error: The connection was reset by peer
Это очень напряженно.У меня есть ошибки в моем коде?
Спасибо за любую помощь.