Как говорит Reactormonk в комментарии выше, невозможно написать экземпляр Async
для Future
, который имеет правильную семантику, потому что Async
расширяет Sync
, а Sync
требует представления вычисления, которые можно запускать повторно, в то время как фьючерсы Scala начинают работать, когда они определены и не могут быть перезапущены.
Незаконный экземпляр
Поучительно убедиться в этом самим, и я бы посоветовал вам попытаться написать свой собственный компилируемый, но (обязательно) незаконный Async[Future]
экземпляр, не глядя на следующий блок кода. Для примера приведу краткий набросок моей головы:
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import cats.effect.{Async, ExitCase, IO}
def futureAsync(implicit c: ExecutionContext): Async[Future] = new Async[Future] {
def async[A](k: (Either[Throwable, A] => Unit) => Unit): Future[A] =
IO.async(k).unsafeToFuture()
def asyncF[A](k: (Either[Throwable, A] => Unit) => Future[Unit]): Future[A] = {
val p = Promise[A]()
val f = k {
case Right(a) => p.success(a)
case Left(e) => p.failure(e)
}
f.flatMap(_ => p.future)
}
def suspend[A](thunk: => Future[A]): Future[A] = Future(thunk).flatten
def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
release: (A, ExitCase[Throwable]) => Future[Unit]
): Future[B] = acquire.flatMap { a =>
use(a).transformWith {
case Success(b) => release(a, ExitCase.Completed).map(_ => b)
case Failure(e) => release(a, ExitCase.Error(e)).flatMap(_ => Future.failed(e))
}
}
def raiseError[A](e: Throwable): Future[A] = Future.failed(e)
def handleErrorWith[A](fa: Future[A])(f: Throwable => Future[A]): Future[A] =
fa.recoverWith { case t => f(t) }
def pure[A](x: A): Future[A] = Future.successful(x)
def flatMap[A, B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Future[Either[A, B]]): Future[B] = f(a).flatMap {
case Right(b) => Future.successful(b)
case Left(a) => tailRecM(a)(f)
}
}
Это скомпилируется просто отлично и, вероятно, будет работать в некоторых ситуациях (но, пожалуйста, не используйте его на самом деле!). Мы сказали, что у него не может быть правильной семантики, и мы можем показать это, используя модуль законов кошачьего эффекта.
Проверка законов
Для начала нам понадобятся некоторые вещи, о которых вам не нужно беспокоиться:
import cats.kernel.Eq, cats.implicits._
import org.scalacheck.Arbitrary
implicit val throwableEq: Eq[Throwable] = Eq.by[Throwable, String](_.toString)
implicit val nonFatalArbitrary: Arbitrary[Throwable] =
Arbitrary(Arbitrary.arbitrary[Exception].map(identity))
implicit def futureEq[A](implicit A: Eq[A], ec: ExecutionContext): Eq[Future[A]] =
new Eq[Future[A]] {
private def liftToEither(f: Future[A]): Future[Either[Throwable, A]] =
f.map(Right(_)).recover { case e => Left(e) }
def eqv(fx: Future[A], fy: Future[A]): Boolean =
scala.concurrent.Await.result(
liftToEither(fx).zip(liftToEither(fy)).map {
case (rx, ry) => rx === ry
},
scala.concurrent.duration.Duration(1, "second")
)
}
Затем мы можем определить тест, который проверяет законы Async
для нашего экземпляра:
import cats.effect.laws.discipline.{AsyncTests, Parameters}
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline
object FutureAsyncSuite extends FunSuite with Discipline {
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val params: Parameters =
Parameters.default.copy(allowNonTerminationLaws = false)
checkAll(
"Async",
AsyncTests[Future](futureAsync).async[String, String, String]
)
}
И тогда мы можем запустить тесты закона:
scala> FutureAsyncSuite.execute()
FutureAsyncSuite:
- Async.async.acquire and release of bracket are uncancelable
- Async.async.ap consistent with product + map
- Async.async.applicative homomorphism
...
Вы увидите, что большинство тестов зеленые; в этом случае многое получается правильно.
Где это нарушает закон
Он показывает три неудачных теста, в том числе следующие:
- Async.async.repeated sync evaluation not memoized *** FAILED ***
GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
(Discipline.scala:14)
Falsified after 1 successful property evaluations.
Location: (Discipline.scala:14)
Occurred when passed generated values (
arg0 = "淳칇멀",
arg1 = org.scalacheck.GenArities$$Lambda$7154/1834868832@1624ea25
)
Label of failing property:
Expected: Future(Success(驅ṇ숆㽝珅뢈矉))
Received: Future(Success(淳칇멀))
Если вы посмотрите на определения законов , вы увидите, что это тест, который определяет значение Future
с помощью delay
, а затем упорядочивает его несколько раз, например:
val change = F.delay { /* observable side effect here */ }
val read = F.delay(cur)
change *> change *> read
Два других сбоя являются аналогичными «незарегистрированными» нарушениями. Эти тесты должны увидеть, что побочный эффект происходит дважды, но в нашем случае невозможно написать delay
или suspend
для Future
таким образом, что это произойдет (хотя стоит попытаться убедить себя, что это это дело).
Что вы должны сделать вместо этого
Подводя итог: вы можете написать экземпляр Async[Future]
, который пройдет что-то вроде 75 из тестов 78 Async
законов, но невозможно написать экземпляр, который пройдет все из них, и использовать незаконный экземпляр действительно плохая идея: и потенциальные пользователи вашего кода, и библиотеки, такие как Doobie, будут считать, что ваши экземпляры законны, и если вы не соответствуете этому предположению, вы открываете дверь для сложных и раздражающих ошибок.
Стоит отметить, что не так уж сложно написать минимальную оболочку для Future
, которая имеет законный Async
экземпляр (например, у меня есть оболочка для будущего Twitter под названием Rerunnable
в моем catbird библиотека). Вы действительно должны просто придерживаться cats.effect.IO
и использовать предоставленные преобразования для преобразования в и из фьючерсов в любых частях вашего кода, где вы работаете с традиционными Future
API-интерфейсами.