Что вам нужно, так это какой-то способ объединить различные потоки WebSocket, чтобы вы могли обрабатывать входящие сообщения, как если бы они пришли из одного источника.
Поскольку вам не нужно отправлять какие-либо данные, а только получать реализация проста.
Давайте начнем создавать функцию, которая будет создавать WebSocket Source для данного uri:
def webSocketSource(uri: Uri): Source[Message, Future[WebSocketUpgradeResponse]] = {
Source.empty.viaMat(Http().webSocketClientFlow(uri))(Keep.right)
}
Поскольку вы не заботитесь об отправке данных, функция немедленно закрывает out канал, предоставив пустой Source. Результатом является Источник, содержащий сообщение, прочитанное из WebSocket.
На этом этапе мы можем использовать эту функцию для создания выделенного источника для каждого uri:
val wsSources: List[Source[Message, NotUsed]] = uris.map { uri =>
webSocketSource(uri).mapMaterializedValue { respFuture =>
respFuture.map {
case _: ValidUpgrade => log.debug(s"Websocket upgrade for [${uri}] successful")
case err: InvalidUpgradeResponse => log.error(s"Websocket upgrade for [${uri}] failed: ${err.cause}")
}
NotUsed
}
}
Здесь нам нужно как-то позаботьтесь о материализованных ценностях, поскольку их невозможно (или, по крайней мере, непросто) объединить, поскольку мы не знаем, сколько их. Итак, здесь мы go с простейшим подходом - просто ведение журнала.
Теперь, когда у нас есть готовые источники, мы можем приступить к их объединению:
val mergedSource: Source[Message, NotUsed] = wsSources match {
case s1 :: s2 :: rest => Source.combine(s1, s2, rest: _*)(Merge(_))
case s1 :: Nil => s1
case Nil => Source.empty[Message]
}
Идея в том, что в случае у нас есть 2 или более uris, мы фактически выполняем операцию слияния, в противном случае, если у нас есть один, мы просто используем его без каких-либо изменений. Наконец, мы также рассмотрим случай, когда у нас вообще нет uri, предоставив пустой Source, который просто завершит поток без ошибок.
На этом этапе мы можем объединить этот источник с потоками и уже погрузить вас иметь и запустить его:
val done: Future[Done] = mergedSource.via(decoder).toMat(sink)(Keep.right).run
Что возвращает нам единое будущее, которое будет завершено, когда все соединения будут завершены, или потерпят неудачу, как только одно соединение выйдет из строя.