Как абстрагироваться от эффектов и использовать ContextShift с Scala Cats? - PullRequest
1 голос
/ 28 марта 2020

Я создаю в Scala и Cats функцию, которая выполняет некоторые операции ввода-вывода и будет вызываться другими частями кода. Я также изучаю Cats и хочу, чтобы моя функция:

  • была обобщенной c по своему эффекту и использовала F[_]
  • Запуск в выделенном пуле потоков
  • Я хочу ввести асин c границы

Я предполагаю, что все мои функции обобщены c в F [_] вплоть до основного метода, потому что я пытаюсь следовать эти правила Cat

Но я изо всех сил пытаюсь заставить эти ограничения работать, используя ContextShift или ExecutionContext. Я написал полный пример здесь , и это отрывок из примера:

object ComplexOperation {
  // Thread pool for ComplexOperation internal use only
  val cs = IO.contextShift(
    ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
  )

  // Complex operation that takes resources and time
  def run[F[_]: Sync](input: String): F[String] =
    for {
      r1 <- Sync[F].delay(cs.shift) *> op1(input)
      r2 <- Sync[F].delay(cs.shift) *> op2(r1)
      r3 <- Sync[F].delay(cs.shift) *> op3(r2)
    } yield r3

  def op1[F[_]: Sync](input: String): F[Int]     = Sync[F].delay(input.length)
  def op2[F[_]: Sync](input: Int): F[Boolean]    = Sync[F].delay(input % 2 == 0)
  def op3[F[_]: Sync](input: Boolean): F[String] = Sync[F].delay(s"Complex result: $input")
}

Это явно не абстрагируется от эффектов, так как ComplexOperation.run требуется ContextShift[IO], чтобы быть в состоянии ввести асин c границы. Какой правильный (или лучший) способ сделать это?

Создание ContextShift[IO] внутри ComplexOperation.run делает функцию зависимой от IO, что мне не нужно. Перемещение создания ContextShift[IO] на вызывающего просто переместит проблему: вызывающий также является обобщенным c в F[_], так как он может получить ContextShift[IO] для передачи на ComplexOperation.run без явной зависимости от IO?

Помните, что я не хочу использовать один глобальный ContextShift[IO], определенный на самом верхнем уровне, но я хочу, чтобы каждый компонент решал сам.

Если мой ComplexOperation.run создаст ContextShift[IO] или это обязанность звонящего?

По крайней мере, я правильно делаю? Или я иду против стандартных практик?

1 Ответ

3 голосов
/ 28 марта 2020

Итак, я взял на себя смелость переписать ваш код, надеюсь, он поможет:

import cats.effect._

object Functions {
  def sampleFunction[F[_]: Sync : ContextShift](file: String, blocker: Blocker): F[String] = {
    val handler: Resource[F, Int] =
      Resource.make(
        blocker.blockOn(openFile(file))
      ) { handler =>
        blocker.blockOn(closeFile(handler))
      }

    handler.use(handler => doWork(handler))
  }

  private def openFile[F[_]: Sync](file: String): F[Int] = Sync[F].delay {
    println(s"Opening file $file with handler 2")
    2
  }

  private def closeFile[F[_]: Sync](handler: Int): F[Unit] = Sync[F].delay {
    println(s"Closing file handler $handler")
  }

  private def doWork[F[_]: Sync](handler: Int): F[String] = Sync[F].delay {
    println(s"Calculating the value on file handler $handler")
    "The final value" 
  }
}

object Main extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val result = Blocker[IO].use { blocker =>
      Functions.sampleFunction[IO](file = "filePath", blocker)
    }

    for {
      data <- result
      _ <- IO(println(data))
    } yield ExitCode.Success
  }
}

Вы можете видеть, что он работает здесь .

Итак, что делает этот код.
Во-первых, он создает Ресурс для файла, поскольку close должен быть выполнен, даже по гарантии или при сбое.
Он использует Blocker для запуска операций open и close в потоке блокировки poo (это делается с помощью ContextShift ) .
Наконец, на main, он создает по умолчанию Blocker , например, для ** IO *, и использует его для вызова вашей функции; и печатает результат.

Не стесняйтесь задавать любые вопросы.

...