У меня есть следующий код:
class ApiRoutes2[F[_]](implicit F: ConcurrentEffect[F]) extends Http4sDsl[F] {
var queue = Queue.bounded[F, String](100)
def createService(queue: Queue[F, String]): F[Unit] = ???
val service: HttpRoutes[F] = HttpRoutes.of[F] {
case PUT -> Root / "services" =>
val toClientF: F[Stream[F, WebSocketFrame]] = queue.map(_.dequeue.map(t => Text(t)))
val fromClient: Pipe[F, WebSocketFrame, Unit] = _.evalMap {
case Text(t, _) => F.delay(println(t))
case f => F.delay(println(s"Unknown type: $f"))
}
// How to "spawn" createService?
toClientF.flatMap { toClient =>
WebSocketBuilder[F].build(toClient, fromClient)
}
}
}
createService
- это функция, которая создает новый сервис.Создание нового сервиса - очень сложный процесс, он включает в себя запуск конвейеров CI, ожидание их завершения и последующий запуск нескольких конвейеров CI таким же образом.Полученная очередь будет использоваться для сообщения браузеру о текущих выполняемых операциях.
Я хочу одновременно «породить» createService и позволить ему работать до его завершения.Однако в то же время я хочу немедленно вернуть WebSocket клиенту.Ака, я не могу заблокировать при "порождении" createService.
Я застрял.Я могу думать только об использовании shift
, но это будет означать, что следующая строка в for-compition будет блокировать ожидание завершения createService
только для того, чтобы затем вернуть веб-сокет клиенту.
Мой подход неверен?Что я делаю не так?