Одновременный эффект «появления» в конечной точке WebSocket - PullRequest
4 голосов
/ 13 мая 2019

У меня есть следующий код:

class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
  var queue = Queue.bounded[F, String](100)

  def createService(queue: Queue[F, String]): F[Unit] = ???

  val service: HttpRoutes[F] = HttpRoutes.of[F] {
    case PUT -> Root / "services" =>
      val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
      val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
        case Text(t, _) => F.delay(println(t))
        case f => F.delay(println(s"Unknown type: $f"))
      }

      // How to "spawn" createService?

      toClientF.flatMap { toClient =>
        WebSocketBuilder[F].build(toClient, fromClient)
      }
  }
}

createService - это функция, которая создает новый сервис.Создание нового сервиса - очень сложный процесс, он включает в себя запуск конвейеров CI, ожидание их завершения и последующий запуск нескольких конвейеров CI таким же образом.Полученная очередь будет использоваться для сообщения браузеру о текущих выполняемых операциях.

Я хочу одновременно «породить» createService и позволить ему работать до его завершения.Однако в то же время я хочу немедленно вернуть WebSocket клиенту.Ака, я не могу заблокировать при "порождении" createService.

Я застрял.Я могу думать только об использовании shift, но это будет означать, что следующая строка в for-compition будет блокировать ожидание завершения createService только для того, чтобы затем вернуть веб-сокет клиенту.

Мой подход неверен?Что я делаю не так?

1 Ответ

3 голосов
/ 14 мая 2019

Поскольку F является экземпляром ConcurrentEffect, у вас также есть экземпляр Concurrent.

Поэтому вы можете использовать Concurrent[F].start, который возвращает Fiber к действующей операции (вы можете просто игнорировать Fiber, если вам не нужно отменять / обеспечивать завершение, хотя).

  val service: HttpRoutes[F] = HttpRoutes.of[F] {
    case PUT -> Root / "services" =>
      val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
      val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
        case Text(t, _) => F.delay(println(t))
        case f => F.delay(println(s"Unknown type: $f"))
      }

      for {
        toClient <- toClientF
        _ <- Concurrent[F].start(createService)
        websocket <- WebSocketBuilder[F].build(toClient, fromClient)
      } yield websocket
  }
...