Как я могу читать несколько веб-сокетов в одном потоке Akka-http? - PullRequest
0 голосов
/ 13 июля 2020

В настоящее время я практикую Akka-http, пытаясь установить sh несколько подключений через веб-узлы. Мой код для создания клиентского потока веб-сокетов (фрагмент) выглядит так:

val webSocketFlow =
  Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings)

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

В настоящее время это отлично работает, если у меня есть один URL-адрес. Мне интересно, как я могу масштабировать это для подключения к нескольким URL-адресам. Так, например, если у меня есть неопределенный список конечных точек веб-сокетов List("ws://localhost:8080/foo", "ws://localhost:8080/bar", "ws://localhost:8080/baz").

, я подумал о добавлении нового потока для каждого URL-адреса, но что, если у меня есть длинный список конечных точек / URL-адресов веб-сокетов. Тогда это становится громоздким и явно ручным. Я также подумал о том, чтобы обернуть это в функцию и вызвать для каждого URL в заданном итеративном элементе. Но это тоже было ужасно.

Есть ли способ, чтобы все пулы подключений были объединены в один поток (или что-то вроде этого)? Также приветствуются дальнейшие чтения. Есть ли способ пометить входящие сообщения с помощью URL-адреса, с которого они приходят?

Обновление: для пояснения, я читаю только из веб-сокетов (только на стороне клиента ) и не отправлять обратно сообщения.

Ответы [ 2 ]

2 голосов
/ 13 июля 2020

Это должно сработать (код записывается в текстовое поле ...):

def taggedWebsocketForUrl(url: String, tag: Int): Source[(Int, Message), Future[WebSocketUpgradeResponse]] =
  outgoing.viaMat(Http().webSocketClientFlow(WebSocketRequest(url), settings = customSettings))(Keep.right).map(tag -> _)

val websocketMergedSource: Source[(Int, Message), Seq[Future[WebSocketUpgradeResponse]]] = {
  // You could replace this with a mess of headOptions etc., but...
  if (websocketUrls.isEmpty) Source.empty[(Int, Message)].mapMaterializedValue(_ => Seq(Future.failed(new NoSuchElementException("no websocket URLs"))))
  else {
    val first: Source[(Int, Message), List[Future[WebSocketUpgradeResponse]]] =
      taggedWebsocketForUrl(websocketUrls.head, 0).mapMaterializedValue(List(_))
    if (websocketUrls.tail.isEmpty) first
    else {
      websocketUrls.tail.foldLeft(first -> 1) {
        (acc, url) =>
          val newSource = acc._1.mergeMat(taggedWebsocketForUrl(url, acc._2)) {
            (futs: List[Future[WebSocketUpgradeResponse]], fut: Future[WebSocketUpgradeResponse]) =>
              fut :: futs // Will reverse at the end...
          }
          newSource -> (acc._2 + 1)
      }._1.mapMaterializedValue(_.reverse)
    }
  }
}

При этом у вас будет много ответов об обновлении (вы можете mapMaterializedValue(Future.sequence _) объединить их в Future[Seq[WebsocketUpgradeResponse]], который выйдет из строя, если какой-либо из них выйдет из строя). Сообщения с n -го URL-адреса в списке будут помечены n.

Обратите внимание, что websocketUrls, являющийся List, указывает на создание сворачивания: если есть n URL-адреса, сообщения с первого URL-адреса будут этапами слияния от go до n -1, а последний URL-адрес будет go только через 1 этап слияния, поэтому вы хотите поместить URL-адреса, которые, как вы ожидаете, создадут больше трафика c ближе к концу списка.

Альтернативным, более эффективным подходом было бы использование IndexedSeq, например Vector или Array, чтобы разделять и властвовать, чтобы построить дерево merge s.

Использование Akka Streams GraphDSL также дало бы вам много контроля, но я бы предпочел использовать это только в крайнем случае.

1 голос
/ 13 июля 2020

Что вам нужно, так это какой-то способ объединить различные потоки WebSocket, чтобы вы могли обрабатывать входящие сообщения, как если бы они пришли из одного источника.

Поскольку вам не нужно отправлять какие-либо данные, а только получать реализация проста.

Давайте начнем создавать функцию, которая будет создавать WebSocket Source для данного uri:

def webSocketSource(uri: Uri): Source[Message, Future[WebSocketUpgradeResponse]] = {
  Source.empty.viaMat(Http().webSocketClientFlow(uri))(Keep.right)
}

Поскольку вы не заботитесь об отправке данных, функция немедленно закрывает out канал, предоставив пустой Source. Результатом является Источник, содержащий сообщение, прочитанное из WebSocket.

На этом этапе мы можем использовать эту функцию для создания выделенного источника для каждого uri:

val wsSources: List[Source[Message, NotUsed]] = uris.map { uri =>
  webSocketSource(uri).mapMaterializedValue { respFuture =>
    respFuture.map {
      case _: ValidUpgrade => log.debug(s"Websocket upgrade for [${uri}] successful")
      case err: InvalidUpgradeResponse => log.error(s"Websocket upgrade for [${uri}] failed: ${err.cause}")
    }

    NotUsed
  }
}

Здесь нам нужно как-то позаботьтесь о материализованных ценностях, поскольку их невозможно (или, по крайней мере, непросто) объединить, поскольку мы не знаем, сколько их. Итак, здесь мы go с простейшим подходом - просто ведение журнала.

Теперь, когда у нас есть готовые источники, мы можем приступить к их объединению:

val mergedSource: Source[Message, NotUsed] = wsSources match {
  case s1 :: s2 :: rest => Source.combine(s1, s2, rest: _*)(Merge(_))
  case s1 :: Nil => s1
  case Nil => Source.empty[Message]
}

Идея в том, что в случае у нас есть 2 или более uris, мы фактически выполняем операцию слияния, в противном случае, если у нас есть один, мы просто используем его без каких-либо изменений. Наконец, мы также рассмотрим случай, когда у нас вообще нет uri, предоставив пустой Source, который просто завершит поток без ошибок.

На этом этапе мы можем объединить этот источник с потоками и уже погрузить вас иметь и запустить его:

val done: Future[Done] = mergedSource.via(decoder).toMat(sink)(Keep.right).run

Что возвращает нам единое будущее, которое будет завершено, когда все соединения будут завершены, или потерпят неудачу, как только одно соединение выйдет из строя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...