Проблемы WebSocket со Scala и fs2 / cats - PullRequest
0 голосов
/ 24 апреля 2018

Я использую Http4s для подключения службы веб-сокетов, которую я могу использовать для связи между этой серверной службой и пользовательским интерфейсом (обновления состояния трубопровода и% выполнения для пакетного задания).

Я использую Пример веб-сокета BlazeBuilder для настройки службы.

Служба работает, но я пытаюсь создать сообщения сокета изнутриэкземпляр класса.Например, я хочу создать экземпляр работника, передать ссылку на соединение с сокетом и иметь возможность передавать данные в это соединение.К сожалению, мне очень трудно сделать эту работу!Это гораздо проще в Python и JS.

См. Код ниже, который в основном является примером кода, который я связал выше.В месте, где я вызываю Stream.emit (...), как я могу передать ссылку на этот "toClient" и по-прежнему передавать его?Если я передаю экземпляр toClient в экземпляр класса, он, похоже, не работает.

case GET -> Root / "ws" =>
      val toClient: Stream[F, WebSocketFrame] = Stream.emit(Text("How can I do this from a class instance?"))
      val fromClient: Sink[F, WebSocketFrame] = _.evalMap { (ws: WebSocketFrame) =>
        ws match {
          case Text(t, _) => F.delay(println(t))
          case f => F.delay(println(s"Unknown type: $f"))
        }
      }
      WebSocketBuilder[F].build(toClient, fromClient)

1 Ответ

0 голосов
/ 02 июня 2019

Вы можете использовать MVar для потоковой связи с веб-сокетом.

Вот пример использования Cats IO Effect:

final class WebSocketServer(implicit timer: Timer[IO]) extends Http4sDsl[IO] {

  implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  def start: IO[ExitCode] = {
    BlazeServerBuilder[IO]
      .bindHttp(8080)
      .withWebSockets(true)
      .withHttpApp(routes.orNotFound)
      .resource
      .use(_ => IO.never)
      .as(ExitCode.Success)
  }

  private[this] val routes: HttpRoutes[IO] = HttpRoutes.of[IO] {
    case GET -> Root / "ws" => {
      for {
        channel <- cats.effect.concurrent.MVar[IO].empty[List[WebSocketFrame]]
        webSocket <- {
          WebSocketBuilder[IO].build(
            send = fs2.Stream
              .eval(channel.take)
              .flatMap(fs2.Stream.emits(_))
              .repeat,
            receive = stream => {
              stream.evalMap {
                case Text(data, _)   => channel.put(List(Text("pong")))
                case unknown         => IO(println(s"Unknown type: $unknown"))
              }
            }
          )
        }
      } yield webSocket
    }
  }
}

Если вы хотитечтобы отправить сообщение обратно клиенту, вы должны поместить его в MVar.

channel.put(List(Text("pong")))

Интересная часть - это повторяющийся поток, который опрашивает MVar для новых сообщений, чтобы отправить их обратно клиенту WebSocket..

fs2.Stream.eval(channel.take).flatMap(fs2.Stream.emits(_).repeat
...