Я использую akka-streams для настройки клиентского веб-сокета. Я пытаюсь инкапсулировать настройки в методе со следующей подписью:
def createConnectedWebSocket(url: String): Flow[Message, Message, _]
Понятно, как создать поток веб-сокетов, но он еще не подключен:
val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest(url))
Сначала я хочу Await
обновить ответ в будущем, а затем вернуть поток сокетов. Однако, чтобы получить будущее, я должен материализовать поток и для этого мне нужно соединить Source
и Sink
. Но за это должен отвечать другой класс адаптера, например тот, который сериализует и десериализует объекты json и предоставляет Flow[JsValue, JsValue, _]
. Не нужно беспокоиться о соединении и, возможно, о повторном соединении, когда соединение потеряно (это поведение станет частью более сложной версии моего метода, как только мне удастся написать его). Надо иметь дело только с простым Flow
.
Мне удалось достичь части того, что я хочу, используя концентраторы:
val mergeHubSource = MergeHub.source[Message](perProducerBufferSize = 16)
val broadcastHubSink = BroadcastHub.sink[Message](bufferSize = 16)
val ((messageSink, upgradeResponse), messageSource) =
mergeHubSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(broadcastHubSink)(Keep.both)
.run()
Так что теперь у меня есть Source
и Sink
, которые я могу объединить в Flow
и вернуть его. Проблема в том, что меня не интересует функциональность хаба. Когда я подключаю Source
к результирующему Flow
и закрываю его, это должно распространяться на сокет, то есть сокет должен закрываться. При использовании MergeHub
он остается открытым, чтобы можно было принимать новые источники.
Возможно ли это? Я думаю, что мог бы преодолеть разрыв с кастомными актерами, но мне кажется, что я здесь заново изобретаю что-то, что, вероятно, уже реализовано в другой форме.