Ожидание подключения потока веб-сокета клиента перед подключением источника и приемника - PullRequest
0 голосов
/ 05 мая 2018

Я использую akka-streams для настройки клиентского веб-сокета. Я пытаюсь инкапсулировать настройки в методе со следующей подписью:

def createConnectedWebSocket(url: String): Flow[Message, Message, _]

Понятно, как создать поток веб-сокетов, но он еще не подключен:

val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest(url))

Сначала я хочу Await обновить ответ в будущем, а затем вернуть поток сокетов. Однако, чтобы получить будущее, я должен материализовать поток и для этого мне нужно соединить Source и Sink. Но за это должен отвечать другой класс адаптера, например тот, который сериализует и десериализует объекты json и предоставляет Flow[JsValue, JsValue, _]. Не нужно беспокоиться о соединении и, возможно, о повторном соединении, когда соединение потеряно (это поведение станет частью более сложной версии моего метода, как только мне удастся написать его). Надо иметь дело только с простым Flow.

Мне удалось достичь части того, что я хочу, используя концентраторы:

val mergeHubSource = MergeHub.source[Message](perProducerBufferSize = 16)
val broadcastHubSink = BroadcastHub.sink[Message](bufferSize = 16)

val ((messageSink, upgradeResponse), messageSource) =
  mergeHubSource
    .viaMat(webSocketFlow)(Keep.both)
    .toMat(broadcastHubSink)(Keep.both)
    .run()

Так что теперь у меня есть Source и Sink, которые я могу объединить в Flow и вернуть его. Проблема в том, что меня не интересует функциональность хаба. Когда я подключаю Source к результирующему Flow и закрываю его, это должно распространяться на сокет, то есть сокет должен закрываться. При использовании MergeHub он остается открытым, чтобы можно было принимать новые источники.

Возможно ли это? Я думаю, что мог бы преодолеть разрыв с кастомными актерами, но мне кажется, что я здесь заново изобретаю что-то, что, вероятно, уже реализовано в другой форме.

1 Ответ

0 голосов
/ 09 мая 2018

Я нашел решение, используя SourceRef и SinkRef. Хотя они предназначены для преодоления разрыва между двумя машинами, их можно использовать и здесь.

val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest(someUrl))

val (sinkRefFuture, sourceRefFuture) =
  StreamRefs.sinkRef[In]()
    .viaMat(f)(Keep.left)
    .toMat(StreamRefs.sourceRef[Out]())(Keep.both)
    .run()

val flow = Flow.fromSinkAndSource(await(sinkRefFuture), await(sourceRefFuture))

с await() определяется, например, так:

def await[T, F <: T](f: Future[F]): T = Await.result(f, 3.seconds)

При этом я подумал, что на самом деле лучше, по крайней мере, в моем случае, не материализовать сокет заранее. Таким образом, тот, кто его использует, также может позаботиться о повторном подключении. Сейчас я прохожу вокруг фабрики потоков, которая создает новые экземпляры веб-сокета Flow (может быть, только один раз материализован) по требованию.

...