Как мне использовать Source.asSubscriber
, чтобы обернуть реактивный слушатель?Я не понимаю его преимущества.
Я пытаюсь создать Source[T]
для asynchttpclient WebSocket.Вот мой код:
def createWsObservable(url: String, onStartAction: Option[WebSocket ⇒ Unit]): Source[WsMessage, KillSwitch] =
Source.asSubscriber[WsMessage].mapMaterializedValue { subs: Subscriber[WsMessage] ⇒
val listener: WebSocketListener = new WebSocketListener() {
override def onOpen(ws: WebSocket): Unit =
subs.onNext(WsOpen(ws))
override def onClose(ws: WebSocket, code: Int, reason: String): Unit =
subs.onComplete()
override def onBinaryFrame(payload: Array[Byte], finalFragment: Boolean, rsv: Int): Unit =
// Doing bunch of stuff here
subs.onNext(...)
override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit =
// Doing bunch of stuff here
subs.onNext(...)
override def onError(t: Throwable): Unit =
subs.onError(t)
override def onPongFrame(payload: Array[Byte]): Unit = {
super.onPingFrame(payload)
}
}
val websocket =
asyncHttpClient
.prepareGet(url)
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build).get
new KillSwitch {
override def shutdown(): Unit = websocket.sendCloseFrame()
override def abort(ex: Throwable): Unit = websocket.sendCloseFrame()
}
}
При первом событии я получаю исключение:
java.lang.IllegalStateException: spec violation: onNext was signaled from upstream without demand
at akka.stream.impl.VirtualProcessor.rec$5(StreamLayout.scala:239)
at akka.stream.impl.VirtualProcessor.onNext(StreamLayout.scala:243)
at ingestion.NettyClientWrapper$$anon$2.onOpen(NettyClientWrapper.scala:55)
Возможно, Source.asSubscriber
плохой выбор для меня?Что я должен сделать, чтобы включить подписчика реактивных потоков в Источник Акки?