Ошибка нескольких подписчиков при использовании источника ActorPublisher и переподключении WebSocketClientFlow - PullRequest
0 голосов
/ 18 мая 2018

Я использую Akka HTTP для подключения Actor к клиенту WebSocket .Актор реализует ActorPublisher[A] и публикует экземпляры A в источнике, который читается в потоке WebSocket клиента.

class Actor(...) extends Actor with ActorPublisher[A]

Код, который создает поток, следующий:

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._

val actor = ...

// This method gets the actor and builds its the Akka Source
// Then opens the WebSocket using that source
def connect(): Unit = {
    val publisher = ActorPublisher[A](actor)
    val source = Source.fromPublisher(publisher)
    openWebSocket(source)
}

def openWebSocket(source: Source[A, NotUsed]): Unit = {
    val flow = Http().webSocketClientFlow(WebSocketRequest(URL))
    val (response, closed) = source
        .map { instanceOfA =>
            // Transform the instances of A to a message to send it through the websocket
            TextMessage(instanceOfA.asJson)
        }
        .viaMat(flow)(Keep.right)
        // Simple sink to print all incoming messages
        .toMat(Sink.foreach {
            case message: TextMessage.Strict => println(message.text)
        })(Keep.both)
        .run()

    // Reconnect if the socket is closed
    closed.foreach { _ =>
        connect()
    }
}

A webSocketClientFlow построен с источником актера и простым не связанным приемником.Это прекрасно работает, пока сокет не закрыт.В этот момент метод connect вызывается снова.Он создает другой источник из того же агента и создает новый поток WebSocket.Проблема в том, что закрытый поток WebSocket по-прежнему подписан на актера, и я получаю следующую ошибку при создании нового:

java.lang.IllegalStateException: ActorPublisher only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)
    at akka.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:59)
    at akka.stream.actor.ActorPublisher.aroundReceive(ActorPublisher.scala:312)
    at akka.stream.actor.ActorPublisher.aroundReceive$(ActorPublisher.scala:272)
    ...

Как мне сообщить старому потоку или источнику отписаться от агента?В настоящее время я остановил агента и заменил его новым, но в этом нет необходимости.

...