Как создать Async [Future] из Async [IO] - PullRequest
6 голосов
/ 03 мая 2019

Я пытаюсь неявно добавить Async и Sync в мой код для хранилища doobie.Синхронизация и асинхронность [F] работают нормально.Я хочу преобразовать их в будущее и сталкиваюсь с проблемой

Я пытался создать свой собственный Aync из IO

def futureAsync(implicit F: MonadError[Future, Throwable]): Async[Future] = new Async[Future] {
    override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] = IO.async(k).unsafeToFuture()

    override def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] =
      throw new Exception("Not implemented Future.asyncF")

    override def suspend[A](thunk: => Future[A]): Future[A] = thunk

    override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(release: (A, ExitCase[Throwable]) => Future[Unit]): Future[B] =
      throw new Exception("Not implemented Future.bracketCase")

    override def raiseError[A](e: Throwable): Future[A] = F.raiseError(e)

    override def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] = F.handleErrorWith(fa)(_ => f(new Exception("")))

    override def pure[A](x: A): Future[A] = F.pure(x)

    override def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = F.flatMap(fa)(f)

    override def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = F.tailRecM(a)(f)
  }

Я поражен реализацией двух функций там asyncF и BracketCase Может некоторыеодна помощь?

1 Ответ

12 голосов
/ 03 мая 2019

Как говорит Reactormonk в комментарии выше, невозможно написать экземпляр Async для Future, который имеет правильную семантику, потому что Async расширяет Sync, а Sync требует представления вычисления, которые можно запускать повторно, в то время как фьючерсы Scala начинают работать, когда они определены и не могут быть перезапущены.

Незаконный экземпляр

Поучительно убедиться в этом самим, и я бы посоветовал вам попытаться написать свой собственный компилируемый, но (обязательно) незаконный Async[Future] экземпляр, не глядя на следующий блок кода. Для примера приведу краткий набросок моей головы:

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}

def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
  def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
    IO.async(k).unsafeToFuture()

  def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
    val p = Promise[A]()
    val f = k {
      case Right(a) => p.success(a)
      case Left(e) => p.failure(e)
    }
    f.flatMap(_ => p.future)
  }

  def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten

  def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
    release: (A, ExitCase[Throwable]) => Future[Unit]
  ): Future[B] = acquire.flatMap { a =>
    use(a).transformWith {
      case Success(b) => release(a, ExitCase.Completed).map(_ => b)
      case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
    }
  }

  def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
  def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
    fa.recoverWith { case t => f(t) }

  def pure[A](x: A): Future[A] = Future.successful(x)
  def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
  def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
    case Right(b) => Future.successful(b)
    case Left(a) => tailRecM(a)(f)
  }
}

Это скомпилируется просто отлично и, вероятно, будет работать в некоторых ситуациях (но, пожалуйста, не используйте его на самом деле!). Мы сказали, что у него не может быть правильной семантики, и мы можем показать это, используя модуль законов кошачьего эффекта.

Проверка законов

Для начала нам понадобятся некоторые вещи, о которых вам не нужно беспокоиться:

import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary

implicit val throwableEq: Eq[Throwable] =  Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
  Arbitrary(Arbitrary.arbitrary[Exception].map(identity))

implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
  new Eq[Future[A]] {
    private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
      f.map(Right(_)).recover { case e => Left(e) }

      def eqv(fx: Future[A], fy: Future[A]): Boolean =
        scala.concurrent.Await.result(
        liftToEither(fx).zip(liftToEither(fy)).map {
          case (rx, ry) => rx === ry
        },
        scala.concurrent.duration.Duration(1, "second")
      )
  }

Затем мы можем определить тест, который проверяет законы Async для нашего экземпляра:

import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline

object FutureAsyncSuite extends FunSuite with Discipline {
  implicit val ec: ExecutionContext = ExecutionContext.global

  implicit val params: Parameters =
    Parameters.default.copy(allowNonTerminationLaws = false)

  checkAll(
    "Async",
    AsyncTests[Future](futureAsync).async[String, String, String]
  )
}

И тогда мы можем запустить тесты закона:

scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...

Вы увидите, что большинство тестов зеленые; в этом случае многое получается правильно.

Где это нарушает закон

Он показывает три неудачных теста, в том числе следующие:

- Async.async.repeated sync evaluation not memoized *** FAILED ***
  GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
   (Discipline.scala:14)
    Falsified after 1 successful property evaluations.
    Location: (Discipline.scala:14)
    Occurred when passed generated values (
      arg0 = "淳칇멀",
      arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
    )
    Label of failing property:
      Expected: Future(Success(驅ṇ숆㽝珅뢈矉))
  Received: Future(Success(淳칇멀))

Если вы посмотрите на определения законов , вы увидите, что это тест, который определяет значение Future с помощью delay, а затем упорядочивает его несколько раз, например:

val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)

change *> change *> read

Два других сбоя являются аналогичными «незарегистрированными» нарушениями. Эти тесты должны увидеть, что побочный эффект происходит дважды, но в нашем случае невозможно написать delay или suspend для Future таким образом, что это произойдет (хотя стоит попытаться убедить себя, что это это дело).

Что вы должны сделать вместо этого

Подводя итог: вы можете написать экземпляр Async[Future], который пройдет что-то вроде 75 из тестов 78 Async законов, но невозможно написать экземпляр, который пройдет все из них, и использовать незаконный экземпляр действительно плохая идея: и потенциальные пользователи вашего кода, и библиотеки, такие как Doobie, будут считать, что ваши экземпляры законны, и если вы не соответствуете этому предположению, вы открываете дверь для сложных и раздражающих ошибок.

Стоит отметить, что не так уж сложно написать минимальную оболочку для Future, которая имеет законный Async экземпляр (например, у меня есть оболочка для будущего Twitter под названием Rerunnable в моем catbird библиотека). Вы действительно должны просто придерживаться cats.effect.IO и использовать предоставленные преобразования для преобразования в и из фьючерсов в любых частях вашего кода, где вы работаете с традиционными Future API-интерфейсами.

...