Я использую Akka HTTP для подключения Actor к клиенту WebSocket .Актор реализует ActorPublisher[A]
и публикует экземпляры A
в источнике, который читается в потоке WebSocket клиента.
class Actor(...) extends Actor with ActorPublisher[A]
Код, который создает поток, следующий:
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._
val actor = ...
// This method gets the actor and builds its the Akka Source
// Then opens the WebSocket using that source
def connect(): Unit = {
val publisher = ActorPublisher[A](actor)
val source = Source.fromPublisher(publisher)
openWebSocket(source)
}
def openWebSocket(source: Source[A, NotUsed]): Unit = {
val flow = Http().webSocketClientFlow(WebSocketRequest(URL))
val (response, closed) = source
.map { instanceOfA =>
// Transform the instances of A to a message to send it through the websocket
TextMessage(instanceOfA.asJson)
}
.viaMat(flow)(Keep.right)
// Simple sink to print all incoming messages
.toMat(Sink.foreach {
case message: TextMessage.Strict => println(message.text)
})(Keep.both)
.run()
// Reconnect if the socket is closed
closed.foreach { _ =>
connect()
}
}
A webSocketClientFlow
построен с источником актера и простым не связанным приемником.Это прекрасно работает, пока сокет не закрыт.В этот момент метод connect
вызывается снова.Он создает другой источник из того же агента и создает новый поток WebSocket.Проблема в том, что закрытый поток WebSocket по-прежнему подписан на актера, и я получаю следующую ошибку при создании нового:
java.lang.IllegalStateException: ActorPublisher only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)
at akka.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:59)
at akka.stream.actor.ActorPublisher.aroundReceive(ActorPublisher.scala:312)
at akka.stream.actor.ActorPublisher.aroundReceive$(ActorPublisher.scala:272)
...
Как мне сообщить старому потоку или источнику отписаться от агента?В настоящее время я остановил агента и заменил его новым, но в этом нет необходимости.